-
Notifications
You must be signed in to change notification settings - Fork 400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for async? #679
Comments
Sometimes you just need to lay down your thoughts to get the proper idea. Also, the fresh mind helps (I was doing my experiments at 4am yesterday:) I am not sure this is correct way to proceed and whether there are some unforeseen consequences that will shot me in the knee later but here's what I did:
Then in my test I do
and get my new shiny user object created properly in DB! So far I am very happy with the result. If more wise and experienced developers won't see any issues with this approach I think it may worse adding something like this to the recipes section as async stack is getting more and more popular. |
Hey, do you know how I can maybe define awaitable Mock in your code? I need to define abstract without ORM factory like that: class ObjectFactory(factory.Factory):
class Meta:
abstract = True
model = Mock
@classmethod
def _create(cls, model_class, *args, **kwargs):
async def create_coro(*args, **kwargs):
return await model_class(*args, **kwargs)
return create_coro(*args, **kwargs) But |
https://mock.readthedocs.io/en/latest/changelog.html#b1 has AsyncMock |
Thanks, that really helped me. class AsyncFactory(factory.Factory):
@classmethod
def _create(cls, model_class, *args, **kwargs):
async def maker_coroutine():
for key, value in kwargs.items():
# when using SubFactory, you'll have a Task in the corresponding kwarg
# await tasks to pass model instances instead
if inspect.isawaitable(value):
kwargs[key] = await value
# replace as needed by your way of creating model instances
return await model_class.create_async(*args, **kwargs)
# A Task can be awaited multiple times, unlike a coroutine.
# useful when a factory and a subfactory must share a same object
return asyncio.create_task(maker_coroutine())
@classmethod
async def create_batch(cls, size, **kwargs):
return [await cls.create(**kwargs) for _ in range(size)]
class UserFactory(AsyncFactory):
...
class Category(AsyncFactory):
...
creator = factory.SubFactory(UserFactory)
class ArticleFactory(AsyncFactory):
...
author = factory.SubFactory(UserFactory)
category = factory.SubFactory(CategoryFactory, creator=factory.SelfAttribute(..author)) In the following example: article = await ArticleFactory.create()
assert article.author == article.category.creator The |
Here comes my solution to this. Partially inspired by @nadege 😄
Usage:
|
I also tried to use factory boy with an async ORM and tried to use RelatedFactory. I got this: import inspect
import factory
from factory.builder import StepBuilder, BuildStep, parse_declarations
class AsyncFactory(factory.Factory):
@classmethod
async def _generate(cls, strategy, params):
if cls._meta.abstract:
raise factory.errors.FactoryError(
"Cannot generate instances of abstract factory %(f)s; "
"Ensure %(f)s.Meta.model is set and %(f)s.Meta.abstract "
"is either not set or False." % dict(f=cls.__name__))
step = AsyncStepBuilder(cls._meta, params, strategy)
return await step.build()
@classmethod
async def _create(cls, model_class, *args, **kwargs):
for key, value in kwargs.items():
if inspect.isawaitable(value):
kwargs[key] = await value
return await model_class.create(*args, **kwargs)
@classmethod
async def create_batch(cls, size, **kwargs):
return [await cls.create(**kwargs) for _ in range(size)]
class AsyncStepBuilder(StepBuilder):
# Redefine build function that await for instance creation and awaitable postgenerations
async def build(self, parent_step=None, force_sequence=None):
"""Build a factory instance."""
# TODO: Handle "batch build" natively
pre, post = parse_declarations(
self.extras,
base_pre=self.factory_meta.pre_declarations,
base_post=self.factory_meta.post_declarations,
)
if force_sequence is not None:
sequence = force_sequence
elif self.force_init_sequence is not None:
sequence = self.force_init_sequence
else:
sequence = self.factory_meta.next_sequence()
step = BuildStep(
builder=self,
sequence=sequence,
parent_step=parent_step,
)
step.resolve(pre)
args, kwargs = self.factory_meta.prepare_arguments(step.attributes)
instance = await self.factory_meta.instantiate(
step=step,
args=args,
kwargs=kwargs,
)
postgen_results = {}
for declaration_name in post.sorted():
declaration = post[declaration_name]
declaration_result = declaration.declaration.evaluate_post(
instance=instance,
step=step,
overrides=declaration.context,
)
if inspect.isawaitable(declaration_result):
declaration_result = await declaration_result
postgen_results[declaration_name] = declaration_result
self.factory_meta.use_postgeneration_results(
instance=instance,
step=step,
results=postgen_results,
)
return instance I'm trying to avoid using asyncio.create_task as I want to control the order in which models instantiated. |
Another version that works with async SQLAlchemy: import factory
from factory.alchemy import SESSION_PERSISTENCE_COMMIT, SESSION_PERSISTENCE_FLUSH
from factory.builder import StepBuilder, BuildStep, parse_declarations
class AsyncFactory(factory.alchemy.SQLAlchemyModelFactory):
@classmethod
async def _generate(cls, strategy, params):
if cls._meta.abstract:
raise factory.errors.FactoryError(
"Cannot generate instances of abstract factory %(f)s; "
"Ensure %(f)s.Meta.model is set and %(f)s.Meta.abstract "
"is either not set or False." % dict(f=cls.__name__))
step = AsyncStepBuilder(cls._meta, params, strategy)
return await step.build()
@classmethod
async def _create(cls, model_class, *args, **kwargs):
for key, value in kwargs.items():
if inspect.isawaitable(value):
kwargs[key] = await value
return await super()._create(model_class, *args, **kwargs)
@classmethod
async def create_batch(cls, size, **kwargs):
return [await cls.create(**kwargs) for _ in range(size)]
@classmethod
async def _save(cls, model_class, session, args, kwargs):
session_persistence = cls._meta.sqlalchemy_session_persistence
obj = model_class(*args, **kwargs)
session.add(obj)
if session_persistence == SESSION_PERSISTENCE_FLUSH:
await session.flush()
elif session_persistence == SESSION_PERSISTENCE_COMMIT:
await session.commit()
return obj
class AsyncStepBuilder(StepBuilder):
# Redefine build function that await for instance creation and awaitable postgenerations
async def build(self, parent_step=None, force_sequence=None):
"""Build a factory instance."""
# TODO: Handle "batch build" natively
pre, post = parse_declarations(
self.extras,
base_pre=self.factory_meta.pre_declarations,
base_post=self.factory_meta.post_declarations,
)
if force_sequence is not None:
sequence = force_sequence
elif self.force_init_sequence is not None:
sequence = self.force_init_sequence
else:
sequence = self.factory_meta.next_sequence()
step = BuildStep(
builder=self,
sequence=sequence,
parent_step=parent_step,
)
step.resolve(pre)
args, kwargs = self.factory_meta.prepare_arguments(step.attributes)
instance = await self.factory_meta.instantiate(
step=step,
args=args,
kwargs=kwargs,
)
postgen_results = {}
for declaration_name in post.sorted():
declaration = post[declaration_name]
declaration_result = declaration.declaration.evaluate_post(
instance=instance,
step=step,
overrides=declaration.context,
)
if inspect.isawaitable(declaration_result):
declaration_result = await declaration_result
postgen_results[declaration_name] = declaration_result
self.factory_meta.use_postgeneration_results(
instance=instance,
step=step,
results=postgen_results,
)
return instance
|
Coming from a Django background and with the async Django ORM added, def willing to add a PR for async capability + Django async capability. The examples above create a new class, which is preferable in most libraries, but I think in this case: creating an "a" prefix method would work best in case someone wants to use both sync and async methods but reuse the declarations. |
If anyone needs a Django version. Note this utilizes Django 4.2's new addition of "asave", but it'll take account for it if you are on a lower version. https://gist.github.com/Andrew-Chen-Wang/59d784496c63ee65714b926d6945b4c6 Factory implementation: import inspect
import factory
from asgiref.sync import sync_to_async
from django.db import IntegrityError
from factory import errors
from factory.builder import BuildStep, StepBuilder, parse_declarations
def use_postgeneration_results(self, step, instance, results):
return self.factory._after_postgeneration(
instance,
create=step.builder.strategy == factory.enums.CREATE_STRATEGY,
results=results,
)
factory.base.FactoryOptions.use_postgeneration_results = use_postgeneration_results
class AsyncFactory(factory.django.DjangoModelFactory):
@classmethod
async def _generate(cls, strategy, params):
if cls._meta.abstract:
raise factory.errors.FactoryError(
"Cannot generate instances of abstract factory %(f)s; "
"Ensure %(f)s.Meta.model is set and %(f)s.Meta.abstract "
"is either not set or False." % dict(f=cls.__name__)
)
step = AsyncStepBuilder(cls._meta, params, strategy)
return await step.build()
class Meta:
abstract = True # Optional, but explicit.
@classmethod
async def _get_or_create(cls, model_class, *args, **kwargs):
"""Create an instance of the model through objects.get_or_create."""
manager = cls._get_manager(model_class)
assert "defaults" not in cls._meta.django_get_or_create, (
"'defaults' is a reserved keyword for get_or_create "
"(in %s._meta.django_get_or_create=%r)"
% (cls, cls._meta.django_get_or_create)
)
key_fields = {}
for field in cls._meta.django_get_or_create:
if field not in kwargs:
raise errors.FactoryError(
"django_get_or_create - "
"Unable to find initialization value for '%s' in factory %s"
% (field, cls.__name__)
)
key_fields[field] = kwargs.pop(field)
key_fields["defaults"] = kwargs
try:
instance, _created = await manager.aget_or_create(*args, **key_fields)
except IntegrityError as e:
get_or_create_params = {
lookup: value
for lookup, value in cls._original_params.items()
if lookup in cls._meta.django_get_or_create
}
if get_or_create_params:
try:
instance = await manager.aget(**get_or_create_params)
except manager.model.DoesNotExist:
# Original params are not a valid lookup and triggered a create(),
# that resulted in an IntegrityError. Follow Django’s behavior.
raise e
else:
raise e
return instance
@classmethod
async def _create(cls, model_class, *args, **kwargs):
"""Create an instance of the model, and save it to the database."""
if cls._meta.django_get_or_create:
return await cls._get_or_create(model_class, *args, **kwargs)
manager = cls._get_manager(model_class)
return await manager.acreate(*args, **kwargs)
@classmethod
async def create_batch(cls, size, **kwargs):
"""Create a batch of instances of the model, and save them to the database."""
return [await cls.create(**kwargs) for _ in range(size)]
@classmethod
async def _after_postgeneration(cls, instance, create, results=None):
"""Save again the instance if creating and at least one hook ran."""
if create and results:
# Some post-generation hooks ran, and may have modified us.
if hasattr(instance, "asave"):
await instance.asave()
else:
await sync_to_async(instance.save)()
class AsyncBuildStep(BuildStep):
async def resolve(self, declarations):
self.stub = factory.builder.Resolver(
declarations=declarations,
step=self,
sequence=self.sequence,
)
for field_name in declarations:
attr = getattr(self.stub, field_name)
if inspect.isawaitable(attr):
attr = await attr
self.attributes[field_name] = attr
class AsyncStepBuilder(StepBuilder):
# Redefine build function that await for instance creation and awaitable postgenerations
async def build(self, parent_step=None, force_sequence=None):
"""Build a factory instance."""
# TODO: Handle "batch build" natively
pre, post = parse_declarations(
self.extras,
base_pre=self.factory_meta.pre_declarations,
base_post=self.factory_meta.post_declarations,
)
if force_sequence is not None:
sequence = force_sequence
elif self.force_init_sequence is not None:
sequence = self.force_init_sequence
else:
sequence = self.factory_meta.next_sequence()
step = AsyncBuildStep(
builder=self,
sequence=sequence,
parent_step=parent_step,
)
await step.resolve(pre)
args, kwargs = self.factory_meta.prepare_arguments(step.attributes)
instance = self.factory_meta.instantiate(
step=step,
args=args,
kwargs=kwargs,
)
if inspect.isawaitable(instance):
instance = await instance
postgen_results = {}
for declaration_name in post.sorted():
declaration = post[declaration_name]
declaration_result = declaration.declaration.evaluate_post(
instance=instance,
step=step,
overrides=declaration.context,
)
if inspect.isawaitable(declaration_result):
declaration_result = await declaration_result
postgen_results[declaration_name] = declaration_result
postgen = self.factory_meta.use_postgeneration_results(
instance=instance,
step=step,
results=postgen_results,
)
if inspect.isawaitable(postgen):
await postgen
return instance |
@B3QL I've defined a Person as such: class PersonFactory(AsyncFactory):
class Meta:
model = Person
id = factory.Faker("uuid4")
first_name = factory.Faker("first_name")
last_name = factory.Faker("last_name")
dob = factory.Faker("date_of_birth", minimum_age=18, maximum_age=90)
gender = factory.Faker("random_element", elements=("Male", "Female")) I'm using pytest, my models are defined with SQLAlchemy 2, and my DB connections are async. Here's the fixture I'm using to get my DB sessions during tests: @pytest.fixture
async def dbsession(
_engine: AsyncEngine,
) -> AsyncGenerator[AsyncSession, None]:
connection = await _engine.connect()
trans = await connection.begin()
session_maker = async_sessionmaker(
connection,
expire_on_commit=False,
)
session = session_maker()
try:
yield session
finally:
await session.close()
await trans.rollback()
await connection.close() Now, if I try to use @pytest.mark.anyio
async def test_videomeeting_creation(
fastapi_app: FastAPI,
client: AsyncClient,
dbsession: AsyncSession,
) -> None:
person_factory = PersonFactory()
person = await person_factory.create()
... I hit this error, since
My temporary solution is a class construction function that allows me to pass my session from inside my tests: def get_person_factory(dbsession: AsyncSession):
class PersonFactory(AsyncFactory):
class Meta:
model = Person
sqlalchemy_session = dbsession
id = factory.Faker("uuid4")
first_name = factory.Faker("first_name")
last_name = factory.Faker("last_name")
dob = factory.Faker("date_of_birth", minimum_age=18, maximum_age=90)
gender = factory.Faker("random_element", elements=("Male", "Female"))
return PersonFactory But I can't help but feel this is not how you intended it to be used... |
I found this python package that implements some of the suggestions in this thread: https://github.com/kuzxnia/async_factory_boy/ |
Is official async support planned for this project? |
Async + Django testing (pytest) is a mess. We moved away from it altogether. |
Are you going to add async support for the SQLAlchemy? https://github.com/kuzxnia/async_factory_boy this one is pretty raw |
For anyone who comes here looking for django, I used @Andrew-Chen-Wang code here + added support for subfactory and it works perfectly: class AsyncSubFactory(factory.SubFactory):
async def evaluate(self, instance, step, extra):
subfactory = self.get_factory()
force_sequence = step.sequence if self.FORCE_SEQUENCE else None
return await step.recurse(subfactory, extra, force_sequence=force_sequence) |
Hi, let me share my solution. My usecase is SQLAlchemy + asyncpg, I wanted to use factory-boy in test cases with reset the DB data in every test cases. I did brief override.
from factory.alchemy import SQLAlchemyModelFactory
from factory import Faker
from sqlalchemy.ext.asyncio import AsyncSession
from my.models import User
class BaseFactory(SQLAlchemyModelFactory):
class Meta:
abstract = True
sqlalchemy_session_persistence = "flush"
@classmethod
async def create(cls, session: AsyncSession, commit: bool = False, **overrides):
obj = cls.build(**overrides)
session.add(obj)
if commit:
await session.commit()
else:
await session.flush()
return obj
@classmethod
async def create_batch(cls, session: AsyncSession, size: int, **overrides) -> list[User]:
return [await cls.create(session, **overrides) for _ in range(size)]
class UserFactory(BaseFactory):
class Meta:
model = User
email = Faker("email")
hashed_password = "hashed_password"
is_active = True
first_name = Faker("first_name")
last_name = Faker("last_name") test code is like this: from httpx import AsyncClient
import pytest
from factory import UserFactory
@pytest.fixture()
async def session():
# init session instance
engine = create_async_engine(settings.DATABASE_URL, echo=False)
async with engine.connect() as conn:
await conn.begin()
await conn.begin_nested()
AsyncSessionLocal = async_sessionmaker(
autoflush=False,
bind=conn,
future=True,
class_=AsyncSession,
)
async_session = AsyncSessionLocal()
yield session
await async_session.close()
await conn.rollback()
await async_engine.dispose()
@pytest.fixture()
async def client():
async with AsyncClient(base_url="http://localhost", transport=ASGITransport(app=app)) as client:
yield client
@pytest.mark.asyncio
async def test_case(client, session):
u = await UserFactory.create(session)
response = await client.get("/users")
assert response.status_code == 200 |
Idiomatic implementation for SQLAlchemy using greenlet:
from typing import Any
from factory import alchemy, enums
from factory.alchemy import SQLAlchemyOptions
from factory.base import Factory, FactoryMetaClass, StubObject, T
from factory.errors import UnknownStrategy
from sqlalchemy.util import await_only, greenlet_spawn
class AsyncFactoryMetaClass(FactoryMetaClass): # type: ignore[misc]
async def __call__(cls, **kwargs: Any) -> T | StubObject: # noqa: ANN401,N805
if cls._meta.strategy == enums.BUILD_STRATEGY:
return cls.build(**kwargs)
if cls._meta.strategy == enums.CREATE_STRATEGY:
return await cls.create(**kwargs)
if cls._meta.strategy == enums.STUB_STRATEGY:
return cls.stub(**kwargs)
raise UnknownStrategy(f"Unknown '{cls.__name__}.Meta.strategy': {cls._meta.strategy}")
class AsyncSQLAlchemyModelFactory(Factory, metaclass=AsyncFactoryMetaClass): # type: ignore[misc]
_options_class = SQLAlchemyOptions
class Meta:
abstract = True
@classmethod
async def create(cls, **kwargs: Any) -> T: # noqa: ANN401
return await greenlet_spawn(cls._generate, enums.CREATE_STRATEGY, kwargs)
@classmethod
async def create_batch(cls, size: int, **kwargs: Any) -> list[T]: # noqa: ANN401
return [await cls.create(**kwargs) for _ in range(size)]
@classmethod
def _create(cls, model_class: type[Any], *args: Any, **kwargs: Any) -> T: # noqa: ANN401
meta = cls._meta
session = meta.sqlalchemy_session
if session is None and (session_factory := meta.sqlalchemy_session_factory) is not None:
session = session_factory()
if not session:
class_name = cls.__name__
raise RuntimeError(
f"No session: "
f"set '{class_name}.Meta.sqlalchemy_session' or '{class_name}.Meta.sqlalchemy_session_factory'"
)
instance = model_class(*args, **kwargs)
session.add(instance)
session_persistence = meta.sqlalchemy_session_persistence
if session_persistence == alchemy.SESSION_PERSISTENCE_FLUSH:
await_only(session.flush())
elif session_persistence == alchemy.SESSION_PERSISTENCE_COMMIT:
await_only(session.commit())
return instance Usage example:
import threading
from sqlalchemy.ext import asyncio as sa_asyncio
# https://docs.sqlalchemy.org/en/20/orm/contextual.html
# https://factoryboy.readthedocs.io/en/stable/orms.html#managing-sessions
async_scoped_session = sa_asyncio.async_scoped_session(sa_asyncio.async_sessionmaker(), threading.get_ident)
from factory import Sequence, alchemy
from my_project.models import MyModel
from tests import database
from tests.factory import AsyncSQLAlchemyModelFactory
# https://factoryboy.readthedocs.io/en/stable/recipes.html
class MyModelFactory(AsyncSQLAlchemyModelFactory):
class Meta:
model = MyModel
sqlalchemy_session_factory = database.async_scoped_session
sqlalchemy_session_persistence = alchemy.SESSION_PERSISTENCE_FLUSH
id = Sequence(lambda i: i + 1)
from sqlalchemy.ext.asyncio import AsyncSession
from tests.factories import MyModelFactory
async def test(async_scoped_session: AsyncSession):
my_model = await MyModelFactory()
my_models = await MyModelFactory.create_batch(3) |
The problem
Coming from Django where we used Factory Boy really a lot to a new, async stack to fully support GraphQL with subscriptions which are really cool (uvicorn + Starlette + Ariadne) we also switched to async ORM (not really an ORM) named GINO. It is based on SQLAlchemy Core and works pretty robust. However, I am struggling to adapt Factory Boy to use GINO models.
Proposed solution
At first glance I thought that I need to implement
_create()
method in my factory model but the problem is that thecreate()
method for GINO model is a coroutine and can't be called from a synchronous code. I tried to experiment withasyncio._get_running_loop()
but I am really new to async stuff and my attempt failed.Extra notes
I am using pytest with pytest-asyncio plugin to run tests with async code which works pretty well including working with DB. For that I have this in my conftest.py:
I really miss Factory Boy and hope there is an easy solution to start my factories again.
I also created an issue for GINO here python-gino/gino#608 but decided to open one here too as I think Factory Boy developed a much wider community and I have better chances that someone has the same problem as I do. Thanks all!
The text was updated successfully, but these errors were encountered: