Download

Async Queue for executing buncha asynchornous delegates in sequence (or parallel)

Hey guys, I made a small utility class, to run asynchronous functions that accept callbacks.
I use it for all sorts of things - loading assets, waiting until character arrives to some spot, waiting till camera finishes scrolling or waiting for player to close a modal pop-up.
This has nothing to do with Unreal’s queue system for doing heavy computations on different threads. It doesn’t do anything with threads at all, just wait for one function to finish before calling another one.

Think of it as latent blueprint nodes but for C++.

FAsyncQueue can be used to run asynchronous delegates in sequence, parallel and combinations of the above.

Use Add() to enqueue delegates matching FAsyncDelegate signature:
a void function that accepts a single argument of another void function with no arguments.

Static factories MakeSync, MakeSequence and MakeParallel can be used to wrap different type of delegates and
delegate collections into a single FAsyncDelegate which can be enqueued with Add().

Execute() accepts a callback and can be called multiple times. If queue is already running, Execute does nothing
except storing a callback.

The example bellow will output:


START
Starting Long Task ASYNC
//10 seconds later
Starting Short Task ASYNC
//1 second later 
Doing Instant Task SYNC
Starting Longest Parallel ASYNC
Starting Shortest Parallel ASYNC
Starting Medium Parallel ASYNC
//1 second later
Finished Shortest Parallel ASYNC
//1 second later (2 seconds from parallel tasks started) 
Finished Medium Parallel
//8 seconds later (10 seconds from parallel tasks started)
Finished Longest Parallel
DONESKIES

The example itself:



 // Don't store the Queue on the stack or it will get destroyed before it finishes
 // You can't use "new", only a factory method "FAsyncQueue::Create()" which always returns `TSharedRef<FAsyncQueue, ESPMode::ThreadSafe>`
Queue = FAsyncQueue::Create(); 
Queue->Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
{
	UE_LOG(LogTemp, Warning, TEXT("Starting Long Task ASYNC"));
	FTimerHandle FooBar;
	this->GetWorldTimerManager().SetTimer(FooBar, Callback, 10, false);
}));
Queue->Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
{
	UE_LOG(LogTemp, Warning, TEXT("Starting Short Task ASYNC"));
	FTimerHandle FooBar;
	this->GetWorldTimerManager().SetTimer(FooBar, Callback, 1, false);
}));
Queue->Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda(]()
{
	UE_LOG(LogTemp, Warning, TEXT("Doing Instant Task SYNC"));
})));

TArray<FAsyncDelegate> ParallelTasks;
TArray<FAsyncDelegate> LongestParallel;
LongestParallel.Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
{
	UE_LOG(LogTemp, Warning, TEXT("Starting Longest Parallel ASYNC"));
	FTimerHandle FooBar;
	this->GetWorldTimerManager().SetTimer(FooBar, Callback, 10, false);
}));
LongestParallel.Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda(]()
{
	UE_LOG(LogTemp, Warning, TEXT("Finished Longest Parallel"));
})));
ParallelTasks.Add(FAsyncQueue::MakeSequence(LongestParallel));

TArray<FAsyncDelegate> ShortestParallel;
ShortestParallel.Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
{
	UE_LOG(LogTemp, Warning, TEXT("Starting Shortest Parallel ASYNC"));
	FTimerHandle FooBar;
	this->GetWorldTimerManager().SetTimer(FooBar, Callback, 1, false);
}));
ShortestParallel.Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda(]()
{
	UE_LOG(LogTemp, Warning, TEXT("Finished Shortest Parallel"));
})));
ParallelTasks.Add(FAsyncQueue::MakeSequence(ShortestParallel));


TArray<FAsyncDelegate> MediumParallel;
MediumParallel.Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
{
	UE_LOG(LogTemp, Warning, TEXT("Starting Medium Parallel ASYNC"));
	FTimerHandle FooBar;
	this->GetWorldTimerManager().SetTimer(FooBar, Callback, 2, false);
}));
MediumParallel.Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda(]()
{
	UE_LOG(LogTemp, Warning, TEXT("Finished Medium Parallel"));
})));
ParallelTasks.Add(FAsyncQueue::MakeSequence(MediumParallel));

Queue->Add(FAsyncQueue::MakeParallel(ParallelTasks));

UE_LOG(LogTemp, Warning, TEXT("START"));
Queue->Execute(FCallbackDelegate::CreateLambda(]()
{
	UE_LOG(LogTemp, Warning, TEXT("DONESKIES"));
}));


Here’s the source: Utility class for asynchronous/coroutine style programming in UE4 C++ · GitHub

Looking forward to your criticisism, suggestions and improvements.

Performance wise I have no idea how bad it is. I’ve made no attempt at optimisation, but it runs pretty well in my project.
It uses thread-safe delegates and pointers, but I haven’t tested it on multiple threads.

1 Like

Does the AsyncQueue code run on a different thread?
If not, then its not realy async and has the same result as just calling the SetTimer instead of adding something to the async queue.

The general idea behind async task queue systems is having

  1. Task class (containing the callback of the code to execute)
  2. Task dispatcher (containing worker threads and the queue of tasks)

As UE4 support C++11 it should be easy to create waitable tasks, tasks returning results, …
I can share code doing this if u want, but the code is part of a standalone c++11 application, not UE4 code. Should be easy to port.

The concept behind this



class ITask
{
    virtual void Execute() = 0;
}

class TaskDispatcher
{
    void WorkerThreadTick()
    {
        if (!TaskQueue.IsEmpty())
        {
            TaskQueue.Top()->Execute();
            TaskQueue.Pop();
        }
    }

    void PushTask(ITask* task)
    {
        TaskQueue.Push(task);
    }

    TDeque<ITask*> TaskQueue;
    TArray<Thread> WorkerThreads;
}


It doesn’t run code on other threads. There’s no reason to do so for tasks that I’m talking about.

One thing I use it for is to queue-up cutscenes/animation, for example:

First Scroll camera to character.
Make character to play an animation.
Then open a pop-up dialog on top of character and wait for it to close.
Now in parallel move other characters to this character and wait for all of them to finish.

This is very easy to do in blueprints with latent notes - you write down you code in sequence and blueprints wait for latent node to finish before executing the next one.

There is no such system for C++ in UE4, so I made one.

You don’t need multiple threads for such a thing.
You could write it as functions calling other functions, but it would turn out into a spaghetti-code which is very hard to read and understand. It leads to confusion, bugs and bunch of flags storing current state.
This class allows you to write code sequantiantilly in a simple list which is much easier to read.