Pyramid: Multi-threaded Database Operation

Go To StackoverFlow.com

1

My application receives one or more URLs (typically 3-4 URLs) from the user, scrapes certain data from those URLs and writes those data to the database. However, because scraping those data take a little while, I was thinking of running each of those scraping in a separate thread so that the scraping + writing to the database can keep on going in the background so that the user does not have to keep on waiting.

To implement that, I have (relevant parts only):

@view_config(route_name="add_movie", renderer="templates/add_movie.jinja2")
def add_movie(request):
    post_data = request.POST

    if "movies" in post_data:
        movies = post_data["movies"].split(os.linesep)

        for movie_id in movies:        
            movie_thread = Thread(target=store_movie_details, args=(movie_id,))
            movie_thread.start()

    return {}

def store_movie_details(movie_id):

    movie_details = scrape_data(movie_id)
    new_movie = Movie(**movie_details) # Movie is my model.

    print new_movie  # Works fine.

    print DBSession.add(movies(**movie_details))  # Returns None.

While the line new_movie does print correct scrapped data, DBSession.add() doesn't work. In fact, it just returns None.

If I remove the threads and just call the method store_movie_details(), it works fine.

What's going on?

2014-07-11 19:41
by tripatheea


2

Firstly, the SA docs on Session.add() do not mention anything about the method's return value, so I would assume it is expected to return None.

Secondly, I think you meant to add new_movie to the session, not movies(**movie_details), whatever that is :)

Thirdly, the standard Pyramid session (the one configured with ZopeTransactionExtension) is tied to Pyramid's request-response cycle, which may produce unexpected behavior in your situation. You need to configure a separate session which you will need to commit manually in store_movie_details. This session needs to use scoped_session so the session object is thread-local and is not shared across threads.

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

session_factory = sessionmaker(bind=some_engine)
AsyncSession = scoped_session(session_factory)

def store_movie_details(movie_id):

    session = AsyncSession()
    movie_details = scrape_data(movie_id)
    new_movie = Movie(**movie_details) # Movie is my model.

    session.add(new_movie)
    session.commit()

And, of course, this approach is only suitable for very light-weight tasks, and if you don't mind occasionally losing a task (when the webserver restarts, for example). For anything more serious have a look at Celery etc. as Antoine Leclair suggests.

2014-07-11 20:46
by Sergey
That worked perfectly. Thanks a lot - tripatheea 2014-07-12 07:40


0

The transaction manager closes the transaction when the response is returned. DBSession has no transaction in the other threads when the response has been returned. Also, it's probably not a good idea to share a transaction across threads.

This is a typical use case for a worker. Check out Celery or RQ.

2014-07-11 20:26
by Antoine Leclair
Ads