DynamoDB SingleTable¶
https://pypi.org/project/ddb-single/
Python DynamoDB interface, specialized in single-table design. DynamoDB is high-performance serverless NoSQL, but difficult to disign tables.
Single-table design needs only single table, and few GSIs (Global Secondary Indexes). It makes effective and easy to manage your whole data models for single service.
Getting Started¶
Install¶
pip install ddb-single
Start DynamoDB Local¶
docker run -d --rm -p 8000:8000 amazon/dynamodb-local
Init Table¶
from ddb_single import Table
table = Table(
table_name="sample",
endpoint_url="http://localhost:8000",
)
table.init()
Data Models¶
Each model has al least 3 keys
primary_key … Hash key for single item. default:
pk: {__model_name__}_{uuid}
seconday_key … Range key for item. default:
sk: {__model_name__}_item
unique_key … key to identify the item is the same. Mainly used to update item.
And you can set serch_key
to enable search via GSI
from ddb_single import BaseModel, DBField, FieldType
class User(BaseModel):
__table__=table
__model_name__ = "user"
name = DBField(unique_key=True)
email = DBField(search_key=True)
age = DBField(type=FieldType.NUMBER, search_key=True)
description=DBField()
Usage¶
need “Qurey” object for CRUD
query.model(foo).create
query.model(foo).get
query.model(foo).search
query.model(foo).update
query.model(foo).delete
from ddb_single import Query
query = Query(table)
Create Item¶
If the item with same value of unique_key
already exist, exist item is updated.
user = User(name="John", email="john@example.com", description="test")
query.model(user).create()
Then, multible items added.
pk |
sk |
data |
name |
description |
|
---|---|---|---|---|---|
user_xxxx |
user_item |
John |
john@example.com |
test |
|
user_xxxx |
search_user_name |
John |
|||
user_xxxx |
search_user_email |
new-john@example.com |
In addition to main item (sk=user_item
), multiple item (sk=search_{__model_name__}_{field_name}
) added to table.
Those “search items” are used to search
The GSI DataSearchIndex
is used to get “search items” to extract target’s pk.
Then, batch_get
items by pk.
sk = hash |
data = range |
pk |
---|---|---|
search_user_name |
John |
user_xxxx |
search_user_email |
new-john@example.com |
user_xxxx |
Search Items¶
user = query.model(User).search(User.name.eq("John"))
print(user)
# -> [{"pk":"user_xxxx", "sk":"user_item", "name":"John", "email":"john@example.com"}]
pk_only=True
to extract pk without batch_get
user_pks = query.model(User).search(User.name.eq("John"), pk_only=True)
print(user_pks)
# -> ["user_xxxx"]
Get single item¶
get(pk)
to get single item.
user = query.model(User).get("user_xxxx")
print(user)
# -> {"pk":"user_xxxx", "sk":"user_item", "name":"John", "email":"john@example.com"}
get_by_unique
to get item by unique_key
user = query.model(User).get_by_unique("John")
print(user)
# -> {"pk":"user_xxxx", "sk":"user_item", "name":"John", "email":"john@example.com"}
pk_only=True
option in get_by_unique
to get primary key
without get_item
pk = query.model(User).get_by_unique("John", pk_only=True)
print(pk)
# -> "user_xxxx"
Update Item¶
user = query.model(User).search(User.email.eq("john@example.com"))
new_user = User(**user[0])
new_user.email = "new-john@example.com"
query.model(new_user).update()
Or use unique value to detect exist item.
new_user = User(name="John", email="new-john@example.com")
query.model(new_user).update()
Then, tha value of “main item” and “seach item” changed
pk |
sk |
data |
name |
description |
|
---|---|---|---|---|---|
user_xxxx |
user_item |
John |
new-john@example.com |
test |
|
user_xxxx |
search_user_name |
John |
|||
user_xxxx |
search_user_email |
new-john@example.com |
Delete Item¶
user = query.model(User).search(User.email.eq("new-john@example.com"))
query.model(user[0]).delete()
primary key
to detect exist item.
query.model(User).delete_by_pk("user_xxxx")
or unique key
query.model(User).delete_by_unique("John")
Batch Writer¶
table.batch_writer()
to create/update/delete multible items
query.model(foo).create(batch=batch)
query.model(foo).update(batch=batch)
query.model(foo).delete(batch=batch)
Batch Create¶
with table.batch_writer() as batch:
for i in range(3):
user = User(name=f"test{i}", age=i+10)
query.model(user).create(batch=batch)
res = query.model(User).search(User.name.begins_with("test"))
print([(r["name"], r["age"]) for r in res])
# -> [("test0", 10), ("test1", 11), ("test2", 12)]
Batch Update¶
with table.batch_writer() as batch:
for i in range(3):
user = User(name=f"test{i}", age=i+20)
query.model(user).update(batch=batch)
res = query.model(User).search(User.name.begins_with("test"))
print([(r["name"], r["age"]) for r in res])
# -> [("test0", 20), ("test1", 21), ("test2", 22)]
Batch Delete¶
pks = query.model(User).search(User.name.begins_with("test"), pk_only=True)
with table.batch_writer() as batch:
for pk in pks:
query.model(user).delete_by_pk(pk, batch=batch)
res = query.model(User).search(User.name.begins_with("test"))
print(res)
# -> []
Relationship¶
Create Model¶
You can sat relationns to other models
relation=BaseModel
to set relation.
class BlogPost(BaseModel):
__model_name__ = "blogpost"
__table__=table
name = DBField(unique_key=True)
content = DBField()
author = DBField(reletion=User)
Create Item¶
blogpost = BlogPost(
name="Hello",
content="Hello world",
author=self.user
)
query.model(blogpost).create()
Then, tha value “reletion item” added
pk |
sk |
data |
name |
author |
content |
---|---|---|---|---|---|
user_xxxx |
user_item |
John |
|||
user_xxxx |
search_user_name |
John |
|||
blogpost_xxxx |
blogpost_item |
Hello |
John |
Hello world |
|
blogpost_xxxx |
search_blogpost_title |
Hello |
|||
blogpost_xxxx |
rel_user_xxxx |
author |
In addition to main item (sk=blogpost_item
), relation item (sk=rel_{primary_key}
) added to table. The GSI DataSearchIndex
is used to get “relation items” to extract target’s pk.
Then, batch_get
items by pk.
sk = hash |
data = range |
pk |
---|---|---|
rel_user_xxxx |
author |
blogpost_xxxx |
Search Relations¶
get_relation(model=Basemodel)
to search relations
blogpost = query.model(BlogPost).get_by_unique("Hello")
blogpost = BlogPost(**blogpost)
user = query.model(blogpost).get_relation(model=User)
print(user)
# -> [{"pk":"user_xxxx", "sk":"user_item", "name":"John"}]
Also get_relation(field=DBField)
to specify field
user = query.model(blogpost).get_relation(field=BlogPost.author)
print(user)
# -> [{"pk":"user_xxxx", "sk":"user_item", "name":"John"}]
Search Reference¶
In this library, “reference” is antonym to relation
get_reference(model=Basemodel)
to search items related to the item
user = query.model(User).get_by_unique("John")
user = User(**blogpost)
blogpost = query.model(blogpost).get_reference(model=BlogPost)
print(blogpost)
# -> [{"pk":"blogpost_xxxx", "sk":"blogpost_item", "name":"Hello"}]
Also get_reference(field=DBField)
to specify field
blogpost = query.model(user).get_reference(field=BlogPost.author)
print(blogpost)
# -> [{"pk":"blogpost_xxxx", "sk":"blogpost_item", "name":"Hello"}]
Update Relation¶
If relation key’s value changed, relationship also changed.
new_user = User(name="Michael")
blogpost = query.model(BlogPost).get_by_unique("Hello")
blogpost["author"] = new_user
blogpost = BlogPost(**blogpost)
query.model(blogpost).update()
Then, “reletion item” changed
pk |
sk |
data |
name |
author |
content |
---|---|---|---|---|---|
user_xxxx |
user_item |
John |
|||
user_xxxx |
search_user_name |
John |
|||
user_yyyy |
user_item |
Michael |
|||
user_yyyy |
search_user_name |
Michael |
|||
blogpost_xxxx |
blogpost_item |
Hello |
Michael |
Hello world |
|
blogpost_xxxx |
search_blogpost_title |
Hello |
|||
blogpost_xxxx |
rel_user_yyyy |
author |
Delete Relation¶
If related item deleted, relationship also deleted
query.model(user).delete_by_unique("Michael")
Then, “reletion item” deleted. But main item’s value is not chenged.
pk |
sk |
data |
name |
author |
content |
---|---|---|---|---|---|
user_xxxx |
user_item |
John |
|||
user_xxxx |
search_user_name |
John |
|||
blogpost_xxxx |
blogpost_item |
Hello |
Michael |
Hello world |
|
blogpost_xxxx |
search_blogpost_title |
Hello |