The ORM¶
The ORM is fully async, uses the active record pattern, and talks to the
database through a pluggable StorageBackend. Filter methods are synchronous
(they build SQL); terminal methods are async (they execute it).
Defining a model¶
from sylvan.database.orm.model.base import Model
from sylvan.database.orm.primitives.fields import AutoPrimaryKey, Column, JsonColumn
from sylvan.database.orm.primitives.relations import BelongsTo, HasMany
from sylvan.database.orm.primitives.scopes import scope
class Repo(Model):
__table__ = "repos"
id = AutoPrimaryKey()
name = Column(type=str)
source_path = Column(type=str, nullable=True)
file_count = Column(type=int, default=0)
settings = JsonColumn(inner_type=dict, nullable=True)
files = HasMany("File", foreign_key="repo_id")
Column types¶
| Declaration | Python type | DB type |
|---|---|---|
Column(type=str) |
str |
TEXT NOT NULL |
Column(type=int) |
int |
INTEGER NOT NULL |
Column(type=float) |
float |
REAL NOT NULL |
Column(type=bool) |
bool |
INTEGER NOT NULL (0/1) |
Column(type=bytes) |
bytes |
BLOB NOT NULL |
Column(type=str, nullable=True) |
str \| None |
TEXT |
Column(type=int, default=0) |
int |
INTEGER NOT NULL DEFAULT 0 |
Column(type=str, column_name="db_col") |
str |
uses db_col in SQL |
AutoPrimaryKey() |
int |
INTEGER PRIMARY KEY (auto-increment) |
JsonColumn(inner_type=list) |
list |
TEXT (auto JSON serialize/deserialize) |
JsonColumn(inner_type=dict) |
dict |
TEXT (auto JSON serialize/deserialize) |
Relations¶
class File(Model):
__table__ = "files"
id = AutoPrimaryKey()
repo_id = Column(type=int)
path = Column(type=str)
# Many-to-one: each file belongs to one repo
repo = BelongsTo("Repo", foreign_key="repo_id")
# One-to-many: each file has many symbols
symbols = HasMany("Symbol", foreign_key="file_id")
# One-to-one: each file has one quality record
quality = HasOne("QualityRecord", foreign_key="file_id")
# Many-to-many through a pivot table
class Workspace(Model):
__table__ = "workspaces"
id = AutoPrimaryKey()
name = Column(type=str)
repos = BelongsToMany(
"Repo",
pivot_table="workspace_repos",
foreign_key="workspace_id",
related_key="repo_id",
)
Relations are lazy by default. Access them after loading:
repo = await Repo.find(1)
await repo.load("files") # load the relation
files = repo.files # now accessible
Scopes¶
Scopes are reusable query fragments defined on the model:
class Symbol(Model):
__table__ = "symbols"
id = AutoPrimaryKey()
name = Column(type=str)
kind = Column(type=str)
repo_id = Column(type=int)
@scope
def functions(query):
return query.where(kind="function")
@scope
def in_repo(query, repo_name):
return query.join("repos", "repos.id = symbols.repo_id").where("repos.name", repo_name)
Use them as chainable methods:
QueryBuilder¶
Every Model.where(...) call returns a QueryBuilder. Chain filter methods
(sync), then call a terminal (async).
Basic queries¶
# All records
repos = await Repo.all().get()
# Filter with kwargs
functions = await Symbol.where(kind="function", language="python").get()
# Filter with operator
large = await Symbol.where("line_count", ">", 100).get()
# First result
repo = await Repo.where(name="sylvan").first()
# Find by primary key
repo = await Repo.find(42)
repo = await Repo.find_or_fail(42) # raises if not found
Where variants¶
# IN clause
await Symbol.where_in("kind", ["function", "method"]).get()
# NOT equal
await Symbol.where_not(kind="constant").get()
# LIKE pattern
await Symbol.where_like("name", "test_%").get()
# NULL checks
await Symbol.where_null("summary").get()
await Symbol.where_not_null("docstring").get()
# BETWEEN
await Symbol.where_between("line_count", 10, 100).get()
# Subquery IN
subquery = Repo.where(name="sylvan").to_subquery("id")
await Symbol.where_in_subquery("repo_id", subquery).get()
# Grouped conditions (parenthesized)
await Symbol.where(kind="function").where_group(
lambda q: q.where_like("name", "test_%").or_where(language="typescript")
).get()
Joins, ordering, grouping¶
# Join
await Symbol.where(kind="class").join(
"files", "files.id = symbols.file_id"
).get()
# Left join
await Symbol.where(kind="function").left_join(
"quality_records", "quality_records.symbol_id = symbols.id"
).get()
# Order
await Symbol.all().order_by("name", "ASC").get()
await Symbol.all().order_by_desc("line_count").get()
# Group
await Symbol.all().select("kind").group_by("kind").get()
# Limit and offset
await Symbol.all().limit(20).offset(40).get()
Select and raw select¶
# Select specific columns
await Symbol.all().select("name", "kind").get()
# Raw SQL expression in select
await Symbol.all().select_raw("COUNT(*) as total").group_by("kind").get()
Aggregates¶
# Single aggregates
count = await Symbol.where(kind="function").count()
total = await Symbol.where(repo_id=1).sum("line_count")
average = await Symbol.where(kind="class").avg("complexity")
biggest = await Symbol.where(repo_id=1).max("line_count")
smallest = await Symbol.where(repo_id=1).min("line_count")
# With group_by, aggregates return dicts
counts = await Symbol.all().group_by("kind").count()
# => {"function": 120, "class": 45, "method": 300, ...}
Multi-aggregate¶
from sylvan.database.orm.query.execution import Sum, Avg, Count, Max
stats = await Symbol.where(repo_id=1).aggregates(
total_lines=Sum("line_count"),
avg_complexity=Avg("complexity"),
symbol_count=Count("*"),
max_lines=Max("line_count"),
)
# => {"total_lines": 5000, "avg_complexity": 3.2, "symbol_count": 500, "max_lines": 200}
Subqueries¶
# Build a subquery without executing
subquery = Symbol.where(kind="function").to_subquery("file_id")
# Use in another query
files = await File.where_in_subquery("id", subquery).get()
Search¶
# FTS5 full-text search
results = await Symbol.search("parse json config").get()
# Vector similarity search
results = await Symbol.similar_to("function that parses configuration", k=20).get()
# Combine search with filters
results = await (
Symbol.search("authentication")
.where(kind="function")
.where(language="python")
.limit(10)
.get()
)
Eager loading¶
# Load relations in batch (avoids N+1)
symbols = await Symbol.where(kind="class").with_("file").get()
for s in symbols:
print(s.file.path) # already loaded, no extra query
# Count related records
repos = await Repo.all().with_count("files").get()
Pagination¶
page = await Symbol.where(kind="function").paginate(page=2, per_page=20)
# => {
# "data": [...], # list of Symbol instances
# "total": 500, # total matching records
# "page": 2,
# "per_page": 20,
# "total_pages": 25,
# }
Conditional queries¶
include_tests = True
results = await (
Symbol.where(kind="function")
.when(include_tests, lambda q: q.where_like("name", "test_%"))
.get()
)
Exists and pluck¶
# Check existence without loading records
has_py = await Symbol.where(language="python").exists()
# Get a flat list of one column
names = await Symbol.where(kind="class").pluck("name")
# => ["Model", "QueryBuilder", "Schema", ...]
Chunked processing¶
async def process_batch(symbols):
for s in symbols:
print(s.name)
await Symbol.all().chunk(100, process_batch)
CRUD operations¶
Create¶
Save (insert or update)¶
repo = Repo(name="my-project", source_path="/code")
await repo.save() # INSERT (no id yet)
repo.file_count = 42
await repo.save() # UPDATE (has id now)
Update¶
# Instance update
repo = await Repo.find(1)
await repo.update(file_count=100, source_path="/new/path")
# Bulk update via query
await Symbol.where(repo_id=1).update(summary=None)
Delete¶
# Instance delete
repo = await Repo.find(1)
await repo.delete()
# Bulk delete via query
count = await Symbol.where(repo_id=1).delete()
Upsert¶
repo = await Repo.upsert(
conflict_columns=["name"],
update_columns=["source_path", "file_count"],
name="my-project",
source_path="/updated/path",
file_count=50,
)
Insert or ignore / replace¶
# Skip if exists
await Repo.insert_or_ignore(name="my-project", source_path="/code")
# Replace if exists
await Repo.insert_or_replace(name="my-project", source_path="/new/code")
First or create / update or create¶
# Find or create
repo = await Repo.first_or_create(
search_by={"name": "my-project"},
source_path="/code",
)
# Find and update, or create
repo = await Repo.update_or_create(
search_by={"name": "my-project"},
source_path="/updated/path",
)
Bulk operations¶
# Insert many records at once
count = await Symbol.bulk_create([
{"name": "foo", "kind": "function", "repo_id": 1, "file_id": 1},
{"name": "bar", "kind": "function", "repo_id": 1, "file_id": 1},
{"name": "Baz", "kind": "class", "repo_id": 1, "file_id": 1},
])
# Upsert many at once
count = await Symbol.bulk_upsert(
records=[
{"symbol_id": "a::foo#function", "name": "foo", "kind": "function"},
{"symbol_id": "a::bar#function", "name": "bar", "kind": "function"},
],
conflict_columns=["symbol_id"],
update_columns=["name", "kind"],
)
# Update many rows with different values per row
count = await Symbol.bulk_update(
records=[
{"id": 1, "summary": "Updated summary for foo"},
{"id": 2, "summary": "Updated summary for bar"},
],
)
# Specify a custom primary key column
count = await Symbol.bulk_update(
records=[
{"symbol_id": "a::foo#function", "summary": "new summary"},
{"symbol_id": "a::bar#function", "summary": "new summary"},
],
pk_column="symbol_id",
)
bulk_update generates a single UPDATE ... SET col = CASE WHEN pk = ? THEN ? ... END
statement per batch, which is far more efficient than individual update() calls when
you need to set different values on each row.
Raw SQL¶
For queries the builder cannot express:
results = await Symbol.raw(
"SELECT * FROM symbols WHERE name LIKE ? AND repo_id = ?",
["%test%", 1],
)
Query debugging¶
# Enable query logging
QueryBuilder.enable_debug()
# Run queries...
await Symbol.where(kind="function").get()
# Read the log
for sql, params in QueryBuilder.get_query_log():
print(sql, params)
# Clean up
QueryBuilder.clear_query_log()
QueryBuilder.disable_debug()