Message Bus - How to communicate with other UE4 projects

Those who’ve seen the McLaren Car Configurator and perhaps even the Making-of Twitch Live-Stream are probably wondering: How can I do that?

was kind enough to throw some keywords: The Message Bus

At this point, it seems to be a C++ only thing with no (obviously named) Blueprint nodes to do this, but the C++ part looks easy enough to get a very basic example up and running:
http://i68.tinypic.com/mb4f2h.png
For that sample project, check out BhaaLseN/UnrealMessageBusDemo on GitHub (made in 4.11.2; but the principal idea should work in pretty much any UE4 Version out there). For a brief overview, keep on reading.

The class making all this possible is [FONT=Courier New]FMessageEndpoint](https://docs.unrealengine.com/latest/INT/API/Runtime/Messaging/Helpers/FMessageEndpoint/index.html), and we’re gonna use it on both sides.

For the Server part (the one receiving messages), we can do this:


#include "Messaging.h"

// this is our message class. it must be a UStruct, and can contain any UObject.
// it can also be empty if you don't need parameters.
USTRUCT()
struct FTestMessage
{
    GENERATED_USTRUCT_BODY()

    UPROPERTY(EditAnywhere, Category = "Message")
    FString SomeString;

    // default constructor for the receiver
    FTestMessage() = default;
    // helper constructor for the sender
    FTestMessage(const FString& TheMessage)
        : SomeString(TheMessage) {}
};

class AMessagingServer
{
public:
    // initialize the Endpoint
    void Init();
    // destroy the Endpoint
    void Shutdown();
    // handle FTestMessage
    void HandleTestMessage(const FTestMessage& Message, const IMessageContextRef& Context);
private:
    FMessageEndpointPtr TestEndpoint;
};

void AMessagingServer::Init()
{
    // connect to the message bus. this will return nullptr when something fails.
    TestEndpoint = FMessageEndpoint::Builder("TestEndpointName")
        // while we're at it, attach a handler for FTestMessage. this can be repeated for multiple message types.
        .Handling<FTestMessage>(this, &AMessagingServer::HandleTestMessage);
    if (TestEndpoint.IsValid())
        // start the actual subscription. without it, we won't receive messages sent by Publish() in our handling method.
        TestEndpoint->Subscribe<FTestMessage>();
}
void AMessagingServer::Shutdown()
{
    // disconnect from the message bus and stop listening for messages.
    TestEndpoint.Reset();
}

void AMessagingServer::HandleTestMessage(const FTestMessage& Message, const IMessageContextRef& Context)
{
    UE_LOG(LogTemp, Warning, TEXT("We just received a test message that said: %s"), *Message.SomeString);
}

Now we only need a client that sends the actual message:


#include "Messaging.h"

// same FTestMessage class as on the server.
// need not be the same .h file, just needs the same name and layout.

class AMessagingClient
{
    // initialize the Endpoint
    void Init();
    // destroy the Endpoint
    void Shutdown();
    // trigger FTestMessage
    void SendTestMessage(const FString& TheMessage);
private:
    FMessageEndpointPtr TestEndpoint;
};

void AMessagingClient::Init()
{
    // connect to the message bus. this will return nullptr when something fails.
    TestEndpoint = FMessageEndpoint::Builder("TestEndpointName").Build();
    // TODO: appropriate error handling if something goes wrong :)
}
void AMessagingClient::Shutdown()
{
    // disconnect from the message bus to clean up.
    TestEndpoint.Reset();
}

void AMessagingClient::SendTestMessage(const FString& TheMessage)
{
    if (TestEndpoint.IsValid())
        TestEndpoint->Publish(new FTestMessage(TheMessage));
}

Points of interrest:

With that, I’ll end for now. The Message Bus has a lot more features not shown here, but it should be enough to get you started.
The next interresting thing is probably using [FONT=Courier New]FMessageEndpoint::Send](https://docs.unrealengine.com/latest/INT/API/Runtime/Messaging/Helpers/FMessageEndpoint/Send/1/index.html) along with [FONT=Courier New]IMessageContext::GetSender](IMessageContext::GetSender | Unreal Engine Documentation) to reply directly to the one sending the message. This can also be used to establish a one-to-one connection after discovering the other side.

Thanks again to @gmpreussner for his awesome slides on the Twitch stream that got me started.

Thanks for the great write-up, BhaaL!

A few corrections:

  • Messages can contain any UPROPERTY type except for UObjects. This means integral types (int32, float, etc.), selected container types (TArray, TMap, etc.), and other UStructs. You can nest all these as deeply as you want. Because UClass inherits from UStruct, the serializer may also happily deserialize UObject properties, but the object will most probably not be reconstructed correctly on the receiving end. UObject serialization is quite complicated, and we decided early on to not support it.

  • The endpoint name is actually not relevant for communication. It is only used for debugging, i.e. it is what’s being shown in the Messaging Debugger UI. In particular, the sender and the receiver do not need to have the same name in order to communicate. In fact, you want them to have different names, so they can be distinguished when debugging. Internally, message endpoints are not identified by name, but by GUID. This GUID (internally represented as FMessageAddress) is generated dynamically each time an endpoint is constructed, and it will never be the same between runs of the app. You will rarely have a need to store this address at run-time. Remember, the whole point of the message bus is to be able to communicate with other endpoints without having to know up front how to reach them.

  • Subscription to a message type is only necessary for published messages. If you know the message address of an endpoint, you can Send() a message to it directly, and that receiver only needs to have a Handler registered. Typically, published messages are used only initially to locate a service (“hey, is there anybody out there that can serve requests of this type?”), and after that you’d normally just Send() messages directly to the service provider, and the service provider will Send() replies directly back to you.

  • The “-messaging” parameter is only necessary for (1) game instances and (2) commandlets. The Editor and any standalone Slate application always have messaging enabled by default. This situation is only temporary and will likely become a project setting once we have a more robust (and secure) UDP transport.

Awesome BhaaL!!!

After watching the Twitch session, I’ve been trying to get my head around this and have been fiddling all day with my poor C++ knowlegde ( so far ). Searching one last time before calling it a day I found your post! I will have a look at your code.

Thanks!

Instead of this…



    if (TestEndpoint.IsValid())
        TestEndpoint.Reset();


you can just write…



    TestEndpoint.Reset();


because TSharedPtr::Reset() already checks for IsValid()

Nice, thanks for the hints @gmpreussner. I’ll update the post and source accordingly :slight_smile:

I had to set them on both projects, or it wouldn’t work. Maybe I was missing something there?

I copy/pasted that from the Engine source, so I thought it was necessary :wink:

If you run the Editor as a game (i.e. when doing Standalone PIE) you’ll also have to set it. The check is in FUdpMessagingModule::SupportsNetworkedTransport(). There has been a regression recently, and depending on which version you’re using, the logic may differ. The latest version is:



	bool SupportsNetworkedTransport() const
	{
		// disallow unsupported platforms
		if (!FPlatformMisc::SupportsMessaging())
		{
			return false;
		}

		// single thread support not implemented yet
		if (!FPlatformProcess::SupportsMultithreading())
		{
			return false;
		}

		// always allow in standalone Slate applications
		if (!FApp::IsGame() && !IsRunningCommandlet())
		{
			return true;
		}

		// otherwise only allow if explicitly desired
		return FParse::Param(FCommandLine::Get(), TEXT("Messaging"));
	}


Like I said, I hope to get rid of this at some point, but for now we don’t want the UDP transport to be turned on everywhere, because it still has some issues.

I was using Cooked builds for Win64; tbh. I didn’t try PIE or launching directly from the Editor :slight_smile:
And this is on the launcher version of 4.11.2. I do have a GitHub build lying around, but for the purpose of this post I tried to keep it to launcher builds so everyone can play.

Thanks, this is awesome. I’ll will be using this in a project next week.

A “cgi” for unreal? :stuck_out_tongue:
I’ve not seen the stream yet but it sounds like a great feature!

FYI I put the code for the McLaren demo on GitHub

Note that the implementation is quick and dirty. Use it for inspiration, not as the solution :slight_smile:

,

Thanks for sharing your configurator/companion plugin! It helps a lot in understanding how this works. I implemented your plugin in two test games ( config and companion ) in 4.11. They do connect, but when triggering a command on the companion, nothing happens on the bound event on the configurator. Any idea what I’m missing?

Also implemeting the plugin in 4.12 preview 4 gives a compile error on RpcServer->GetEndPoint(). The GetEndpoint() method doesn’t seem to exist in IMessageRpcServer. But maybe that is because the code for that is not yet implemented in this preview version of 4.12.

Hope you can help me out here, would love to get this working.

Hello there,

if you plan to add http / Json based message capability to Unreal I am voting for choosing WAMP (http://wamp-proto.org/) as a standard for the message protocol. We did something similar with a current project and use Crossbar.io as an external message broker. There are already mature libraries for javascript like autobahn.js that make it easy to create realtime websocket based websites that communicate nicely with Unreal with WAMP.

Nobody would have to write new message protocol libraries for javascript or python if you use an open protocol. We used the Autobahn CPP reference implementation in Unreal which is ok for us from the licensing stuff.

Just a head up, if you didn’t try that yet.

Best regards,

Thanks for this great suggestion, . I had not heard about WAMP before and will check it out this weekend!

Thanks to both of you (BhaaL and ) for this solution.

I’m wondering whether it is possible to handle a messge that has been sent from a Windows application (not an Unreal project). I’d like to control some animation behaviour aspects from an outside appliction.

Hi,

I can also recommend to look into crossbar.io and WAMP. We decided to do some of the logic in an outside application to support a multi-user setup. Works good for us and supports all range of client languages out of the box.

Yeah, it looks promising. I already started working on it, but it will take a while. We’re still in the middle of integrating libwebsocket, and that is a prerequisite for WAMP.

1 Like

I’m trying out the demo (total newbie in the game dev side) - downloaded it - the server side builds fine but the client side pops a message box “MessagingClient could not be compiled. Try rebuilding from source manually”. I’ve built the code in VS - and “appears” to build. When I try running the .exe manually I get a warnings about running with “COOKED” versus “UNCOOKED” version. I can reply with logs etc if needed. Thanks for any advice.
JackJ

Hello, I am also working with WAMP and crossbar.io . Could you share some information about how you interface the subscribe and publish methods with Unreal Engine interface. That would be of great help.

I’m trying to understand how this works, and made a simple actor to verify my assumptions. All it does is the server picks a random target, sets a position variable (and pubs it), and both server and client use that to set the actor location, however the position received is always 0. Any ideas?

edit: The code below now works

.H


#pragma once

#include "Engine.h"
#include "CoreUObject.h"
#include "MessageEndpoint.h"

#include "SyncedActor.generated.h"

USTRUCT()
struct FSyncPositionMessage
{
	GENERATED_BODY()

public:
        UPROPERTY()
	FVector Position;
};

UCLASS()
class ASyncedActor
	: public AActor
{
	GENERATED_BODY()

public:
	ASyncedActor();

	UPROPERTY(VisibleAnywhere)
	FVector Position;

	UPROPERTY(VisibleAnywhere)
	FVector TargetPosition;

	virtual void BeginPlay() override;
	virtual void Tick(float DeltaSeconds) override;
	virtual void BeginDestroy() override;

private:
	TSharedPtr<FMessageEndpoint, ESPMode::ThreadSafe> MessageEndpoint;
	FTimerHandle SelectTargetHandle;

	void SelectTarget();
	void HandlePositionMessage(const FSyncPositionMessage& Message, const TSharedRef<IMessageContext, ESPMode::ThreadSafe>& Context);
};

CPP:


#include "SyncedActor.h"
#include "MessageEndpoint.h"
#include "MessageEndpointBuilder.h"

ASyncedActor::ASyncedActor()
{
	PrimaryActorTick.bCanEverTick = true;

	RootComponent = CreateDefaultSubobject<USceneComponent>(TEXT("Root"));
}

void ASyncedActor::BeginPlay()
{
	Super::BeginPlay();

	MessageEndpoint = FMessageEndpoint::Builder("ASyncedActor")
		.Handling<FSyncPositionMessage>(this, &ASyncedActor::HandlePositionMessage)
		.Build();

	if (!HasAuthority())
		MessageEndpoint->Subscribe<FSyncPositionMessage>();
	else
	{
		SelectTarget();
		GetWorld()->GetTimerManager().SetTimer(SelectTargetHandle, this, &ASyncedActor::SelectTarget, 3.0f, true);
	}
}

void ASyncedActor::Tick(float DeltaSeconds)
{
	Super::Tick(DeltaSeconds);

	if (HasAuthority())
	{
		Position = FMath::Lerp(Position, TargetPosition, DeltaSeconds);

		FSyncPositionMessage* SyncMessage = new FSyncPositionMessage();
		SyncMessage->Position = Position;
		MessageEndpoint->Publish(SyncMessage, EMessageScope::All);
	}

	FVector Size(25, 25, 25);
	this->SetActorLocation(Position);
}

void ASyncedActor::BeginDestroy()
{
	Super::BeginDestroy();
}

void ASyncedActor::SelectTarget()
{
	TargetPosition = FMath::VRand() * 100;
	TargetPosition += FVector(-600, 0, 100);
}

void ASyncedActor::HandlePositionMessage(const FSyncPositionMessage& Message, const TSharedRef<IMessageContext, ESPMode::ThreadSafe>& Context)
{
	GEngine->AddOnScreenDebugMessage(-1, 1.0, FColor::Blue, FString::Printf(TEXT("HandlePositionMessage: x: %f, y: %f, z: %f"), Message.Position.X, Message.Position.Y, Message.Position.Z));

	if (!HasAuthority())
		Position = Message.Position;
}

I haven’t had any time to look into this yet, sorry :frowning: