The Wayback Machine - https://web.archive.org/web/20240106043315/https://github.com/python/cpython/pull/22491
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-33533: Provide an async iterator version of as_completed #22491

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

JustinTArthur
Copy link
Contributor

@JustinTArthur JustinTArthur commented Oct 2, 2020

  • as_completed returns an object that is both
    • an iterator (of coroutines) like before
    • an async iterator of futures (including any futures passed in) (new)
  • Existing tests have been adjusted to test both the old and new style.
    • test_as_completed_reverse_wait was left alone because it specifically tests for quirks with the older style where multiple awaitables can be yielded prior to awaiting them, but each returned awaitable always represents the next-completed task regardless of the order in which it was returned.
  • New test_as_completed_resume_iterator test added to ensure iterator can be resumed in both styles.
  • New test_as_completed_same_tasks_in_as_out test added to ensure tasks passed to the new style are yielded back as-is.
  • Documentation and what's new updated.

Example:

urls = 'https://www.python.org/', 'https://www.mozilla.org/'
task_urls = {
    asyncio.create_task(http_get(url)): url
    for url in urls
}

async for task in asyncio.as_completed(task_urls.keys()):
    completed_url = task_urls[task]
    print(f'Finished {completed_url}!')
    process_page(await task)

This is similar to #10251, but that PR's implementation does not have as_completed return an iterator, so the object's iteration could not be resumed with subsequent for statements or calls to next(). I don't know how widely this is done, but I wanted to ensure backwards compatibility.

https://bugs.python.org/issue33533

#77714

@JustinTArthur
Copy link
Contributor Author

JustinTArthur commented Oct 3, 2020

Replaced the commit to conform the new async iterator to the format originally suggested in bpo-33533, where yielded objects are awaitables (including any of the original Futures passed in). I also added Docs adjustments, What's New, and NEWS.d.

@JustinTArthur JustinTArthur changed the title bpo-33533: Provide an async-generator version of as_completed bpo-33533: Provide an async iterator version of as_completed Oct 3, 2020
* as_completed returns object that is both iterator and async iterator
* Existing tests adjusted to test both the old and new style
* New test to ensure iterator can be resumed
* New test to ensure async iterator yields any passed-in Futures as-is
Follows the revisions made based on @hniksic's feedback.
self._todo = todo
self._todo_left = len(todo)

def __aiter__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take care to remove the done callbacks when the iteration is complete? That kind of thing would be possible if __aiter__ were a generator:

async def __aiter__(self):
    while True:
        ... check _todo_left etc. ...
        yield await self._wait_for_one()
    finally:
        ... remove done callbacks off _todo futures ...

If someone calls as_completed on multiple futures and abandons iteration before the end, it would be nice to clear the callbacks. For example, if this function were called multiple times with the same set of futures, the number of callbacks on them would accumulate:

async def process(futs):
    async for fut in as_completed(futs):
        ... 
        if some_condition:
            break

@1st1 what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think cleaning up the callbacks on iterator close makes sense. We could at least put the cleanup in __del__. If we decide to deprecate the plain iterator form, we can eventually implement the whole as_complete function as an asynchronous generator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine to make __aiter__ return a generator because as_completed never returned an async-iterable to begin with. Regarding the implementation, note that asyncio has dedicated machinery for cleaning up async iteration which seems based on async generators and their aclose method. I'm not an expert in the area, but it's probably a better bet to make use of that mechanism than to rely on __del__.

As for __iter__, I think it's safe to leave its behavior unchanged, both for ease of implementation and to avoid affecting existing code in any way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to leave its behavior unchanged, both for ease of implementation and to avoid affecting existing code in any way.

I'm ok with us doing better cleanup on the older pattern as I can't imagine any easy way for user code to have accessed or relied on the leftover callbacks, at least not with public interfaces.

It should be fine to make __aiter__ return a generator because as_completed never returned an async-iterable to begin with.

You're right, I tried generators for both __iter__ and __aiter__ following your example of using the parent iterator's _todo and was able to get it to work while maintaining backwards compatibility. I'm not keen on this approach as you end up with an extra object (the generator)

it's probably a better bet to make use of that mechanism than to rely on __del__.

I actually prefer __del__ in this case as we aren't doing any cleanup actions that require an event loop and a generator's aclose/finally requires an event loop to finalize, and as mentioned above, I like the idea of cleaning up the older style iteration as well.

Here's another quirk that comes with making __aiter__ a generator: resumption using for loops ends up iterating on a new iterator (the generator) every time. Example:

async def do_queries():
    dns_requests = asyncio.as_completed((
        resolver_a_try_ipv4,
        resolver_b_try_ipv4,
        resolver_a_try_ipv6,
        resolver_b_try_ipv6,
    ))
    async for request in dns_requests:
        result = await request
        enqueue_result_for_parsing(result)
        break

    async for request in dns_requests:
        log_slow_result(request)

    # The first generator is now garbage collected without being exhausted or aclosed.
    # CTask will throw "Task was destroyed but it is pending!" into the loop exception handler.

and if we put cleanup in the second-layer generators, it won't get called for the single iteration use pattern:

first_result = await(await asyncio.as_completed(requests).__anext__())
# Generator is never created nor is its `finally` block run.

# Or w/ plain iterator:
first_result = await next(asyncio.as_completed(requests))
# Generator is never created nor is its `finally` block run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that __del__ works here. There are reference loops for every future: as_completed iterator -> todo set -> future -> _handle_completion bound method -> as_completed iterator. __del__ can only be called when all these loops are broken at some link, for example when the todo set is cleared, or features are cleared, or bound methods are cleared, but in this case __del__ does nothing.

Such reference loops exist in the current generator-based code too. It seems they are inevitable.

@JustinTArthur
Copy link
Contributor Author

@1st1 or @asvetlov if I take the time to resolve the conflicts again, will either of you have time in the next week or two to review? If not, let me know if there's another Core Developer you'd recommend to take a look.

@mbello
Copy link

mbello commented Apr 23, 2021

The issue that motivated this PR is almost 3 years old now and it really is something I wish had been merged already.

Can we still hope to have it in for 3.10?

@JustinTArthur
Copy link
Contributor Author

JustinTArthur commented Apr 23, 2021

I will likely bring it up to the mailing lists. Other than at-mentioning @asvetlov and @1st1 again, I don't know what else I can do at this point; they are the core developers responsible for asyncio last I checked.

@mbello
Copy link

mbello commented May 24, 2021

Is there anything that members of the python community can do to get this issue higher on the priority list of reviewers?

@septatrix
Copy link

septatrix commented Oct 15, 2021

Is there anything that members of the python community can do to get this issue higher on the priority list of reviewers?

Probably nothing more than upvoting this PR (which I just did).

@MaxwellDupre
Copy link
Contributor

Hi, could you re-target for 3.12 (WhatsNew 3.10), please?

@gvanrossum
Copy link
Member

@serhiy-storchaka Do you need my review?

@serhiy-storchaka
Copy link
Member

Do you need my review?

Take a look please. The main difference from the original code (except resolving many conflicts and fixing test) is that I removed the __del__() methods which looks useless. But I have not add close() and aclose() methods, because in the current code close() does not do anything useful. We can add them later when we decide about semantics.

There is a small breaking change. Now the running even loop should exist when as_completed() is called, and the input iterator is consumed at the time of calling as_completed(). Previously as_completed() can be called without an event loop, it was only required for the time of iteration. Also the input iterator was consumed only after starting the iteration. This is the nature of generator functions -- the code is executed only when iteration starts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants