Pgwire
PostgreSQL client library for Deno and Node.js that exposes all features of the wire protocol.
Install / Use
/learn @exedealer/PgwireREADME
pgwire
PostgreSQL client library for Deno and Node.js that exposes all features of wire protocol.
- Memory efficient data streaming
- Logical replication, including pgoutput protocol
- Copy from stdin and to stdout
- Query cancellation
- Implicit-transaction multi-statement queries
- Listen/Notify
- Query pipelining, single round trip
- Efficient bytea transferring
- OOM protected
- Pure js without dependencies
Create connection
import { pgconnection } from 'https://raw.githubusercontent.com/exedealer/pgwire/main/mod.js';
// use exact commit or tag instead of main ^^^^
const pg = pgconnection('postgres://USER:PASSWORD@HOST:PORT/DATABASE');
https://www.postgresql.org/docs/18/libpq-connect.html#LIBPQ-CONNSTRING-URIS
Good practice is to get connection URI from environment variable:
// app.js
import { pgconnection } from 'https://raw.githubusercontent.com/exedealer/pgwire/main/mod.js';
const pg = pgconnection(Deno.env.get('POSTGRES'));
Set POSTGRES environment variable when run process:
$ POSTGRES='postgres://USER:PASSWORD@HOST:PORT/DATABASE' deno run --allow-env --allow-net app.js
pgconnection() function also accepts parameters as object:
const pg = pgconnection({
host: '127.0.0.1',
port: 5432,
user: 'postgres',
password: 'postgres',
database: 'postgres',
});
Its possible to pass multiple connection URIs or objects to pgconnection() function. In this case actual connection parameters will be computed by merging all parameters in first-win priority. Following technique can be used to force specific parameters values or provide default-fallback values:
const pg = pgconnection(Deno.env.get('POSTGRES'), {
// use default application_name if not set in env
application_name: 'my-awesome-app',
});
Don't forget to .end() connection when you don't need it anymore:
const pg = pgconnection(Deno.env.get('POSTGRES'));
try {
// use `pg`
} finally {
await pg.end();
}
// or just
await using pg = pgconnection(Deno.env.get('POSTGRES'));
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/await_using
Using pgwire in web server
// app.js
import { pgpool } from 'https://raw.githubusercontent.com/exedealer/pgwire/main/mod.js';
await using pg = pgpool(Deno.env.get('POSTGRES'));
const websrv = Deno.serve({ handler });
await websrv.finished;
async function handler(req) {
const [greeting] = await pg.query(`SELECT 'hello world, ' || now()`);
return new Response(greeting);
}
$ POSTGRES='postgres://USER:PASSWORD@HOST:PORT/DATABASE?_poolSize=4&_poolIdleTimeout=5min' deno run --allow-env --allow-net app.js
When _poolSize is not set then a new connection is created for each query. This option makes possible to switch to external connection pool like pgBouncer.
Querying
const { rows } = await pg.query(`
SELECT i, 'Mississippi'
FROM generate_series(1, 3) i
`);
assertEquals(rows, [
[1, 'Mississippi'],
[2, 'Mississippi'],
[3, 'Mississippi'],
]);
Function call and other single-value results can be accessed by array destructuring.
const [scalar] = await pg.query(`SELECT current_user`);
assertEquals(scalar, 'postgres');
Parametrized query
const { rows } = await pg.query({
statement: `
SELECT i, $1
FROM generate_series(1, $2) i
`,
params: [
{ type: 'text', value: 'Mississippi' }, // $1
{ type: 'int4', value: 3 }, // $2
],
});
assertEquals(rows, [
[1, 'Mississippi'],
[2, 'Mississippi'],
[3, 'Mississippi'],
]);
TODO Why no interpolation API
Multi-statement queries
Postgres allows to execute multiple statements within a single query.
const {
results: [
, // skip CREATE TABLE category
, // skip CREATE TABLE product
{ rows: categories },
{ rows: products },
],
} = await pg.query(`
-- lets generate sample data
CREATE TEMP TABLE category(id) AS VALUES
('fruits'),
('vegetables');
CREATE TEMP TABLE product(id, category_id) AS VALUES
('apple', 'fruits'),
('banana', 'fruits'),
('carrot', 'vegetables');
-- then select all generated data
SELECT id FROM category ORDER BY id;
SELECT id, category_id FROM product ORDER BY id;
`);
assertEquals(categories, [
['fruits'],
['vegetables'],
]);
assertEquals(products, [
['apple', 'fruits'],
['banana', 'fruits'],
['carrot', 'vegetables'],
]);
Postgres wraps multi-statement query into transaction implicitly. Implicit transaction does rollback automatically when error occures or does commit when all statements successfully executed. Multi-statement queries and implicit transactions are described here https://www.postgresql.org/docs/18/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT
Top level rows accessor will contain rows returned by last SELECTish statement.
Iterator accessor will iterate over first row returned by last SELECTish statement.
const retval = await client.query(`
SELECT id FROM category;
-- Lets select products before update.
-- This is last SELECTish statement
SELECT id, category_id FROM product ORDER BY id FOR UPDATE;
-- UPDATE is not SELECTish (unless RETURING used)
UPDATE product SET category_id = 'food';
`);
const { rows } = retval;
assertEquals(rows, [
['apple', 'fruits'],
['banana', 'fruits'],
['carrot', 'vegetables'],
]);
const [topProductId, topProductCategory] = retval;
assertEquals(topProductId, 'apple');
assertEquals(topProductCategory, 'fruits');
Large datasets streaming
There are two ways of fetching query result.
First way is to call await pg.query(). In this case all rows will be loaded in memory.
Other way to consume result is to call pg.stream() and iterate over data chunks.
const iterable = pg.stream(`SELECT i FROM generate_series(1, 2000) i`);
let sum = 0;
for await (const chunk of iterable)
for (const [i] of chunk.rows) {
sum += i;
}
// sum of natural numbers from 1 to 2000
assertEquals(sum, 2001000);
pg.stream() accepts the same parameters as pg.query(), supports parametrized and multistatement queries.
TODO describe chunk shape
Copy from stdin
If statement is COPY ... FROM STDIN then stdin parameter must be set.
async function * generateData() {
const utf8enc = new TextEncoder();
for (let i = 1; i <= 3; i++) {
yield utf8enc.encode(i + '\t' + 'Mississipi' + '\n');
}
}
await pg.query({
statement: `COPY foo FROM STDIN`,
stdin: generateData(),
});
Copy to stdout
const upstream = pg.stream({
statement: `COPY foo TO STDOUT`,
});
const utf8dec = new TextDecoder();
let result = '';
for await (const chunk of upstream) {
result += utf8dec.decode(chunk);
}
assertEquals(result,
'1\tMississippi\n' +
'2\tMississippi\n' +
'3\tMississippi\n'
));
Listen and Notify
https://www.postgresql.org/docs/18/sql-notify.html
pg.onnotification = ({ pid, channel, payload }) => {
try {
console.log(pid, channel, payload);
} catch (err) {
// handle error or let process exit
}
});
await pg.query(`LISTEN some_channel`);
TODO back preassure doc
Simple and Extended query protocols
Postgres has two query protocols - simple and extended. Simple protocol allows to send multi-statement query as single script where statements are delimited by semicolon. pgwire utilizes simple protocol when .query() is called with a string in first argument:
await pg.query(`
CREATE TABLE foo (a int, b text);
INSERT INTO foo VALUES (1, 'hello');
COPY foo FROM STDIN;
SELECT * FROM foo;
`, {
// optional stdin for COPY FROM STDIN statements
stdin: fs.createReadableStream('/tmp/file1.tsv'),
// stdin also accepts array of streams for multiple
// COPY FROM STDIN statements
stdins: [
fs.createReadableStream(...),
fs.createReadableStream(...),
...
],
});
Extended query protocol allows to pass parameters for each statement so it splits statements into separate chunks. Its possible to use extended query protocol by passing one or more statement objects to .query() function:
await pg.query({
statement: `CREATE TABLE foo (a int, b text)`,
}, {
statement: `INSERT INTO foo VALUES ($1, $2)`,
params: [
{ type: 'int4', value: 1 }, // $1
{ type: 'text', value: 'hello' }, // $2
],
}, {
statement: 'COPY foo FROM STDIN',
stdin: fs.createReadableStream('/tmp/file1.tsv'),
}, {
statement: 'SELECT * FROM foo',
});
Logical replication
Logical replication is native PostgreSQL mechanism which allows your app to subscribe on data modification events such as insert, update, delete and truncate. This mechanism can be useful in different ways - replicas synchronization, cache invalidation or history tracking. https://www.postgresql.org/docs/18/logical-replication.html
Lets prepare database for logical replication. At first we need to configure PostgreSQL server to write enough information to WAL:
ALTER SYSTEM SET wal_level = logical;
-- then restart postgres server
We need to create replication slot for our app. Replication slot is PostgreSQL entity which behaves like a message queue of replication events.
SELECT pg_create_logical_replication_slot(
'my-app-slot',
'test_decoding' -- logical decoder plugin
);
Generate some modification events:
CREATE TABLE foo(a INT NOT NULL PRIMARY KEY, b TEXT);
INSERT INTO foo VALUES (1, 'hello'), (2, 'world');
UPDATE foo SET b = 'all' WHERE a = 1;
DELETE FROM foo WHERE a = 2;
TRUNCATE foo;
Now we are ready to consume replication messages:
using await pg = pgconnection({ replication: 'database' }, Deno.env.get('POSTGRES'));
const replicationStream = pg.logicalReplication({ slot:
Related Skills
feishu-drive
341.6k|
things-mac
341.6kManage Things 3 via the `things` CLI on macOS (add/update projects+todos via URL scheme; read/search/list from the local Things database)
clawhub
341.6kUse the ClawHub CLI to search, install, update, and publish agent skills from clawhub.com
postkit
PostgreSQL-native identity, configuration, metering, and job queues. SQL functions that work with any language or driver
