SkillAgentSearch skills...

PyMyORM

Pure Python MySQL ORM

Install / Use

/learn @oldjun/PyMyORM
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

PyMyORM

Table of Contents

This package contains a pure-Python MySQL Object Relational Mapping client library.

<a href=#requirements>Requirements</a>

<a href=#installation>Installation</a>

Package is uploaded on PyPI.

You can install it with pip:

$python3 pip install PyMyORM

<a href=#documentation>Documentation</a>

Documentation is coming soon.

<a href=#example>Example</a>

table

The following examples use a simple table

create table `t_user` (
    `id` int unsigned not null auto_increment,
    `name` varchar(16) not null default '',
    `phone` varchar(16) not null default '',
    `money` decimal(10,2) not null default 0,
    `gender` tinyint unsigned not null default 0,
    `status` tinyint unsigned not null default 0,
    `time` timestamp not null default current_timestamp,
    primary key(`id`),
    unique key `idx_name` (`name`),
    key `idx_phone` (`phone`),
    key `idx_status` (`status`),
    key `idx_time` (`time`)
) engine=InnoDB auto_increment=1 default charset=utf8mb4;

model

the simplest definition of a model

from pymyorm.model import Model

class User(Model):
    tablename = 't_user'

by default model's primary key is id, if your table's primary key isn't id, you can adjust the model class's attributes like this:

from pymyorm.model import Model

class User(Model):
    tablename = 't_user'
    primary_key = 'primary key'

connect

connect to database at a single thread mode

from pymyorm.database import Database
config = dict(host='127.0.0.1', port=3306, user='root', password='password', database='test')
Database.connect(**config)

if your program is running at multi thread mode, you should use connection pool instead. see the connection pool section.

raw sql

sometimes we need to execute sql statement, like creating tables, do it like below.

from pymyorm.database import Database
fp = open('sql/t_user.sql', 'r', encoding='utf-8')
sql = fp.read()
fp.close()

Database.connect(**config)
Database.execute(sql)
from pymyorm.database import Database
from config import db

Database.connect(**db)
sql = "select * from t_user"
all = Database.query(sql)
for one in all:
    print(one)

tables

from pymyorm.database import Database
from config import db

Database.connect(**db)
tables = Database.tables()
for table in tables:
    print(table)

schema

from pymyorm.database import Database
from config import db

Database.connect(**db)
schemas = Database.schema('t_user')
for schema in schemas:
    print(schema)

reflection

from pymyorm.database import Database
from config import db

Database.connect(**db)
Database.reflect(table='t_user', model='models/user.py')

select

find one user which name is 'ping'

from models.user import User
one = User.find().where(name='ping').one()
print(one.id, one.name)

find one user which name is 'ping' and phone is '18976641111'

from models.user import User
one = User.find().where(name='ping').where(phone='18976641111').one()
print(one)

find one user which name is 'ping' and phone is '18976641111', in another way

from models.user import User
one = User.find().where(name='ping', phone='18976641111').one()
print(one)

find one user which money is not equals to 200

from models.user import User
one = User.find().where('money', '!=', 200).one()
print(one)
from models.user import User
all = User.find().order('id desc').offset(0).limit(5).all()
for one in all:
    print(one)

batch select

the all() function will return all data which matched the where conditions, if the table is too big, it will cost too much memory and slow down the program.

in this situation we can use batch() instead of all(). the code slice below shows each time read 100 users, until all to the end.

from models.user import User
batch = User.find().batch(size=100)
for all in batch:
    for one in all:
        print(one)

where

where condition support operator: =, !=, <, <=, >=, >, in, not in, like, not like, is, is not, between ,and also support plain expression:

from models.user import User
one = User.find().where("name='jack' or status=1").one()

update

find the user which name is 'lily', and change her money to 500, her phone to '18976642222'

from models.user import User
one = User.find().where(name='lily').one()
one.money = 500
one.phone = '18976642222'
one.save()

change the user which name is 'lily', update her money and phone directly

# case 2
from models.user import User
User.find().where(name='lily').update(money=500, phone='18976642222')

insert

insert one user into table

# case 1
from models.user import User
user = User(name='rose', phone='18976643333', money=100)
user.save()

insert one user into table, in another way

# case 2
from models.user import User
user = User()
user.name = 'vera'
user.phone = '18976644444'
user.money = 100
user.save()

batch insert

the save() function only insert/update one data at a time. we can use insert() function to insert more than one data at a time to improve the performance.

from models.user import User
fields = ('name', 'phone', 'money')
values = [
    ('jack', '18976643333', 120),
    ('sean', '18976654444', 160),
    ('vera', '18976645555', 180),
]
User.insert(fields, values)

delete

find the user which name is 'lily', and delete it

from models.user import User
one = User.find().where(name='lily').one()
one.delete()

delete the user which name is 'lily' directly

from models.user import User
User.find().where(money=100).delete()

find users which money more than 100, and delete it one by one

from models.user import User
all = User.find().where('money', '>', 100).all()
for one in all:
    one.delete()

delete the users which money more than 100 directly

from models.user import User
User.find().where('money', '>', 100).delete()

delete all users, don't do this if you don't know what you are doing.

from models.user import User
User.find().delete()

exists

find the user which name is 'ping' is exists or not, return True or False rather than the user data

from models.user import User
exists = User.find().where(name='ping').exists()

count

count the number of users which status is equal to 0

from models.user import User
count = User.find().where(status=0).count()

sum

sum of user's money

from models.user import User
money = User.find().sum('money')

min

find the minimal money of users, return the minimal money rather than the user data

from models.user import User
money = User.find().where(status=0).min('money')

max

find the maximal money of users, return the maximal money rather than the user data

from models.user import User
money = User.find().where(status=0).max('money')
print(money)

average

calculate the average money of the users, return the average money rather than the user data

from models.user import User
money = User.find().where(status=0).average('money')

scalar

find the user's money which name is jack

from models.user import User
money = User.find().where(name='jack').scalar('money')

column

list all user's name

from models.user import User
names = User.find().column('name')

group by

group the users by gender, and calculate the average money of each group, and return the users which group average money are more than 220

from models.user import User
all = User.find() \
    .select('gender', 'count(*) as count', 'avg(money) as avg', 'sum(money) as total') \
    .group('gender') \
    .having('avg', '>', 220) \
    .all()
for one in all:
    print(one)

group count

group the users by gender, and get the total number of groups

from models.user import User
total = User.find().group('gender').count()
print(total)

truncate

truncate the user table, don't do this if you don't know what you are doing.

from models.user import User
User.truncate()

join

find admin which role is 'roles' and which lock is 0

# case 1: inner join
from models.admin import Admin
from models.admin_role import AdminRole
all = Admin.find().select('a.*').alias('a') \
    .join(table=AdminRole.tablename, alias='r', on='a.role = r.id') \
    .where('r.name', '=', 'role1') \
    .where('a.lock', '=', 0) \
    .all()
for one in all:
    print(one)
# case 2: left join
from models.admin import Admin
from models.admin_role import AdminRole
all = Admin.find().alias('a') \
    .join(table=AdminRole.tablename, alias='r', on='a.role=r.id', type='left') \
    .where('a.lock', '=', 0) \
    .all()
for one in all:
    print(one)
# case 3
from models.admin import Admin
from models.admin_role import AdminRole
all = Admin.find().select('a.*').alias('a') \
    .join(table=AdminRole.tablename, alias='r', on='a.role=r.id') \
    .where('a.lock', '=', 0) \
    .all()
for one in all:
    print(one)
# case 4: join more than one table
from models.admin import Admin
from models.admin_role import AdminRole
from models.admin_auth import AdminAuth
all = Admin.find().select('username', 'a.role').alias('a')
View on GitHub
GitHub Stars8
CategoryData
Updated8mo ago
Forks8

Languages

Python

Security Score

77/100

Audited on Aug 4, 2025

No findings