from IPython.display import Markdown
+from fastcore.test import test_fail, test_eq
Source
Metadata
+from fastcore.net import urlsave
+= 'https://github.com/lerocha/chinook-database/raw/master/ChinookDatabase/DataSources/Chinook_Sqlite.sqlite'
+ url = Path('chinook.sqlite')
+ path if not path.exists(): urlsave(url, path)
= Database("chinook.sqlite") db
+ +
Database.t
++++Database.t ()
Exported source
+class _Getter:
+"Abstract class with dynamic attributes providing access to DB objects"
+ def __init__(self, db): self.db = db
+ # NB: Define `__dir__` in subclass to get list of objects
+ def __repr__(self): return ", ".join(dir(self))
+ def __contains__(self, s): return (s if isinstance(s,str) else s.name) in dir(self)
+ def __getitem__(self, idxs):
+ if isinstance(idxs,str): return self.db.table(idxs)
+ return [self.db.table(o) for o in idxs]
+ def __getattr__(self, k):
+ if k[0]=='_': raise AttributeError
+ return self.db[k]
+
+class _TablesGetter(_Getter):
+def __dir__(self): return self.db.table_names()
+
+@patch(as_prop=True)
+def t(self:Database): return _TablesGetter(self)
By returning a _TablesGetter
we get a repr and auto-complete that shows all tables in the DB.
= db.t
+ dt dt
Album, Artist, Customer, Employee, Genre, Invoice, InvoiceLine, MediaType, Playlist, PlaylistTrack, Track, cat
+= dt.Artist
+ artist artist
<Table Artist (ArtistId, Name)>
+This also can be used to get multiple tables at once.
+'Album','Artist'] dt[
[<Table Album (AlbumId, Title, ArtistId)>, <Table Artist (ArtistId, Name)>]
+assert 'Artist' in dt
+assert artist in dt
+assert 'foo' not in dt
+ +
View.c
++++View.c ()
Exported source
+class _Col:
+def __init__(self, t, c): self.t,self.c = t,c
+ def __str__(self): return f'"{self.t}"."{self.c}"'
+ def __repr__(self): return self.c
+ def __iter__(self): return iter(self.c)
+
+class _ColsGetter:
+def __init__(self, tbl): self.tbl = tbl
+ def __dir__(self): return map(repr, self())
+ def __call__(self): return [_Col(self.tbl.name,o.name) for o in self.tbl.columns]
+ def __contains__(self, s): return (s if isinstance(s,str) else s.c) in self.tbl.columns_dict
+ def __repr__(self): return ", ".join(dir(self))
+
+def __getattr__(self, k):
+ if k[0]=='_': raise AttributeError
+ return _Col(self.tbl.name, k)
+
+@patch(as_prop=True)
+def c(self:Table): return _ColsGetter(self)
+
+@patch(as_prop=True)
+def c(self:View): return _ColsGetter(self)
+ +
Table.c
++++Table.c ()
Column auto-complete and repr are much the same as tables.
+= artist.c
+ ac ac
ArtistId, Name
+Columns stringify in a format suitable for including in SQL statements.
+print(f"select {ac.Name} ...")
select "Artist"."Name" ...
++ +
View.__str__
++++View.__str__ ()
Return str(self).
+Exported source
+@patch
+def __str__(self:Table): return f'"{self.name}"'
+
+@patch
+def __str__(self:View): return f'"{self.name}"'
+ +
Table.__str__
++++Table.__str__ ()
Return str(self).
+Tables and views do the same.
+print(f"select {ac.Name} from {artist}")
select "Artist"."Name" from "Artist"
+assert 'Name' in ac
+assert ac.Name in ac
+assert 'foo' not in ac
Queries and views
++ +
Database.q
++++Database.q (sql:str, params=None)
Exported source
+@patch
+def q(self:Database, sql: str, params = None)->list:
+return list(self.query(sql, params=params))
This is a minor shortcut for interactive use.
+= db.q(f"select * from {artist} where {ac.Name} like 'AC/%'")
+ acdc acdc
[{'ArtistId': 1, 'Name': 'AC/DC'}]
+Exported source
+def _get_flds(tbl):
+return [(k, v|None, field(default=tbl.default_values.get(k,None)))
+ for k,v in tbl.columns_dict.items()]
+
+def _dataclass(self:Table, store=True, suf='')->type:
+"Create a `dataclass` with the types and defaults of this table"
+ = make_dataclass(self.name.title()+suf, _get_flds(self))
+ res
+ flexiclass(res)if store: self.cls = res
+ return res
+
+= _dataclass Table.dataclass
= artist.dataclass()
+ artist_dc = artist_dc(**acdc[0])
+ art1_obj art1_obj
Artist(ArtistId=1, Name='AC/DC')
+You can get the definition of the dataclass using fastcore’s dataclass_src
:
= dataclass_src(artist_dc)
+ src 'python') hl_md(src,
@dataclass
+class Artist:
+int | None = None
+ ArtistId: str | None = None Name:
+ +
all_dcs
++++all_dcs (db, with_views=False, store=True, suf='')
dataclasses for all objects in db
Exported source
+def all_dcs(db, with_views=False, store=True, suf=''):
+"dataclasses for all objects in `db`"
+ return [o.dataclass(store=store, suf=suf) for o in db.tables + (db.views if with_views else [])]
+ +
create_mod
++++create_mod (db, mod_fn, with_views=False, store=True, suf='')
Create module for dataclasses for db
Exported source
+def create_mod(db, mod_fn, with_views=False, store=True, suf=''):
+"Create module for dataclasses for `db`"
+ = str(mod_fn)
+ mod_fn if not mod_fn.endswith('.py'): mod_fn+='.py'
+ with open(mod_fn, 'w') as f:
+ print('from dataclasses import dataclass', file=f)
+ print('from typing import Any,Union,Optional\n', file=f)
+ for o in all_dcs(db, with_views, store=store, suf=suf): print(dataclass_src(o), file=f)
'db_dc') create_mod(db,
from db_dc import Track
+**dt.Track.get(1)) Track(
Track(TrackId=1, Name='For Those About To Rock (We Salute You)', AlbumId=1, MediaTypeId=1, GenreId=1, Composer='Angus Young, Malcolm Young, Brian Johnson', Milliseconds=343719, Bytes=11170334, UnitPrice=0.99)
++
call’]
+*Built-in mutable sequence.
+If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.*
+Exported source
+@patch
+def __call__(
+self:(Table|View),
+ str|None=None, # SQL where fragment to use, for example `id > ?`
+ where:|dict|NoneType=None, # Parameters to use with `where`; iterable for `id>?`, or dict for `id>:id`
+ where_args: Iterablestr|None=None, # Column or fragment of SQL to order by
+ order_by: int|None=None, # Number of rows to limit to
+ limit:int|None=None, # SQL offset
+ offset:str = "*", # Comma-separated list of columns to select
+ select:bool=False, # Return tuple of (pk,row)?
+ with_pk:bool=True, # Convert returned dict to stored dataclass?
+ as_cls:dict|None=None, # Extra constraints
+ xtra:**kwargs)->list:
+ "Shortcut for `rows_where` or `pks_and_rows_where`, depending on `with_pk`"
+ = getattr(self, 'pks_and_rows_where' if with_pk else 'rows_where')
+ f if not xtra: xtra = getattr(self, 'xtra_id', {})
+ if xtra:
+ = ' and '.join(f"[{k}] = {v!r}" for k,v in xtra.items())
+ xw = f'{xw} and {where}' if where else xw
+ where = f(where=where, where_args=where_args, order_by=order_by, limit=limit, offset=offset, select=select, **kwargs)
+ res if as_cls and hasattr(self,'cls'):
+ if with_pk: res = ((k,self.cls(**v)) for k,v in res)
+ else: res = (self.cls(**o) for o in res)
+ return list(res)
This calls either rows_where
(if with_pk
) or with_pk
(otherwise). If dataclass(store=True)
has been called, then if as_cls
rows will be returned as dataclass objects.
=2) artist(limit
[Artist(ArtistId=1, Name='AC/DC'), Artist(ArtistId=2, Name='Accept')]
+If with_pk
then tuples are returns with PKs 1st.
=True, limit=2) artist(with_pk
[(1, Artist(ArtistId=1, Name='AC/DC')), (2, Artist(ArtistId=2, Name='Accept'))]
+1) artist.get(
{'ArtistId': 1, 'Name': 'AC/DC'}
+= dt.Album
+ album
+= f"""select {album}.*
+ acca_sql from {album} join {artist} using (ArtistId)
+where {ac.Name} like 'AC/%'"""
'sql') hl_md(acca_sql,
select "Album".*
+from "Album" join "Artist" using (ArtistId)
+where "Artist"."Name" like 'AC/%'
db.q(acca_sql)
[{'AlbumId': 1,
+ 'Title': 'For Those About To Rock We Salute You',
+ 'ArtistId': 1},
+ {'AlbumId': 4, 'Title': 'Let There Be Rock', 'ArtistId': 1}]
+"AccaDaccaAlbums", acca_sql, replace=True) db.create_view(
<Database <sqlean.dbapi2.Connection object>>
++ +
Database.v
++++Database.v ()
Exported source
+class _ViewsGetter(_Getter):
+def __dir__(self): return self.db.view_names()
+
+@patch(as_prop=True)
+def v(self:Database): return _ViewsGetter(self)
= db.v
+ dv dv
AccaDaccaAlbums
+ dv.AccaDaccaAlbums()
[{'AlbumId': 1,
+ 'Title': 'For Those About To Rock We Salute You',
+ 'ArtistId': 1},
+ {'AlbumId': 4, 'Title': 'Let There Be Rock', 'ArtistId': 1}]
+Exported source
+def _parse_typ(t): return t if not (_args:= get_args(t)) else first(_args, bool)
int, None]) _parse_typ(Union[
int
++ +
get_typ
++++get_typ (t)
Get the underlying type.
+int, None]) get_typ(Union[
int
+int) get_typ(
int
+If you have an Enum
where all the fields are the same type, then _get_typ
will return that type.
class _Test(Enum): foo='val1'; bar=2
+class _Test2(Enum): foo='val3'; bar='val4'
# fields are not the same type
+ get_typ(_Test)
<enum '_Test'>
+# fields are all of type `str`
+ get_typ(_Test2)
str
++ +
Database.create
++++Database.create (cls=None, name=None, pk='id', foreign_keys=None, + defaults=None, column_order=None, not_null=None, + hash_id=None, hash_id_columns=None, extracts=None, + if_not_exists=False, replace=False, ignore=True, + transform=False, strict=False)
Create table from cls
, default name to snake-case version of class name
+ | Type | +Default | +Details | +
---|---|---|---|
cls | +NoneType | +None | +Dataclass to create table from | +
name | +NoneType | +None | +Name of table to create | +
pk | +str | +id | +Column(s) to use as a primary key | +
foreign_keys | +NoneType | +None | +Foreign key definitions | +
defaults | +NoneType | +None | +Database table defaults | +
column_order | +NoneType | +None | +Which columns should come first | +
not_null | +NoneType | +None | +Columns that should be created as NOT NULL |
+
hash_id | +NoneType | +None | +Column to be used as a primary key using hash | +
hash_id_columns | +NoneType | +None | +Columns used when calculating hash | +
extracts | +NoneType | +None | +Columns to be extracted during inserts | +
if_not_exists | +bool | +False | +Use CREATE TABLE IF NOT EXISTS |
+
replace | +bool | +False | +Drop and replace table if it already exists | +
ignore | +bool | +True | +Silently do nothing if table already exists | +
transform | +bool | +False | +If table exists transform it to fit schema | +
strict | +bool | +False | +Apply STRICT mode to table | +
The class you pass to create
is converted to a dataclass where any fields missing a default are defaulted to None
.
class Nm(Enum): fn='meow'; ln='prr'
+
+ class Cat: id: int; name:Nm|None; age: int|None; city: str = "Unknown"
+= db.create(Cat)
+ cats 1) Cat(
Cat(id=1, name=UNSET, age=UNSET, city='Unknown')
+print(cats.schema)
CREATE TABLE [cat] (
+ [id] INTEGER PRIMARY KEY,
+ [name] TEXT,
+ [weight] FLOAT
+)
+ db.create(Cat)
<Table cat (id, name, weight)>
+To transform a table after creation, use the .create()
method again, this time with the transform
keyword set to True
.
class Cat: id: int; name: str; age: int; city: str = "Unknown"; breed: str = "Unknown"
+= db.create(Cat, transform=True)
+ cats cats
<Table cat (id, name, weight)>
+1) Cat(
Cat(id=1, name=UNSET, age=UNSET, city='Unknown', breed='Unknown')
+print(cats.schema)
CREATE TABLE [cat] (
+ [id] INTEGER PRIMARY KEY,
+ [name] TEXT,
+ [weight] FLOAT
+)
+ db.t.cat.drop()
+ +
Database.import_file
++++Database.import_file (table_name, file, format=None, pk=None, + alter=False)
Import path or handle file
to new table table_name
This uses sqlite_utils.utils.rows_from_file
to load the file.
= Database(":memory:")
+ db = "id,name,age\n1,Alice,30\n2,Bob,25"
+ csv1 = "id,name,age\n3,Charlie,35\n4,David,40"
+ csv2 = "id,name,age,city\n5,Eve,45,New York"
+ csv3
+# import file to new table
+= db.import_file("people", csv1)
+ tbl assert len(tbl()) == 2
+
+# import file to existing table (same schema)
+= db.import_file("people", csv2)
+ tbl assert len(tbl()) == 4
+
+# import file to existing table (schema change fails)
+lambda: db.import_file("people", csv3),contains='city')
+ test_fail(
+# import file to existing table (schema change succeeds)
+assert 'city' not in tbl.c
+= db.import_file("people", csv3, alter=True)
+ tbl assert 'city' in tbl.c
+
+print(tbl())
+ tbl.drop()
[{'id': 1, 'name': 'Alice', 'age': 30, 'city': None}, {'id': 2, 'name': 'Bob', 'age': 25, 'city': None}, {'id': 3, 'name': 'Charlie', 'age': 35, 'city': None}, {'id': 4, 'name': 'David', 'age': 40, 'city': None}, {'id': 5, 'name': 'Eve', 'age': 45, 'city': 'New York'}]
+Database diagrams
+(Requires graphviz.)
+= album.foreign_keys[0]
+ fk fk
ForeignKey(table='Album', column='ArtistId', other_table='Artist', other_column='ArtistId')
++ +
diagram
++++diagram (tbls, ratio=0.7, size='10', neato=False, render=True)
Exported source
+def _edge(tbl):
+return "\n".join(f"{fk.table}:{fk.column} -> {fk.other_table}:{fk.other_column};"
+ for fk in tbl.foreign_keys)
+
+def _row(col):
+= " 🔑" if col.is_pk else ""
+ xtra = ' bgcolor="#ffebcd"' if col.is_pk else ""
+ bg return f' <tr><td port="{col.name}"{bg}>{col.name}{xtra}</td></tr>'
+
+def _tnode(tbl):
+= "\n".join(_row(o) for o in tbl.columns)
+ rows = f"""<table cellborder="1" cellspacing="0">
+ res <tr><td bgcolor="lightgray">{tbl.name}</td></tr>
+{rows}
+ </table>"""
+return f"{tbl.name} [label=<{res}>];\n"
Exported source
+def diagram(tbls, ratio=0.7, size="10", neato=False, render=True):
+= "\nlayout=neato;\noverlap=prism;\noverlap_scaling=0.5;""" if neato else ""
+ layout = "\n".join(map(_edge, tbls))
+ edges = "\n".join(map(_tnode, tbls))
+ tnodes
+ = f"""digraph G {{
+ res rankdir=LR;{layout}
+size="{size}";
+ratio={ratio};
+node [shape=plaintext]
+
+{tnodes}
+
+{edges}
+}}
+"""
+return Source(res) if render else res
= Database("chinook.sqlite") db
diagram(db.tables)