Interactive Process Communication

Hi everyone

I am trying to communicate with a process constantly.

What i need:
I create the process, which is an external and non-UE4 based application
Through pipes i communicate with it constantly(as long as the game runs), which involves sending and receiving messages.

FMonitoredProcess can actually be used after adding necessery functionality such as WritePipe. But i dont think my computer can handle building the source code.

Than i decided to use FPlatformProcess but i couldn’t find a function to send message.

So i tried to write a class which further extends FPlatformProcess. I confronted many problems and since i can’t find enough information i feel like i am suffocating. I have learned many things from try and see.

Then when i was trying to write some code i saw this class with the help of auto-completion, FPlatformNamedPipe and the most painful thing is it doesnt even have a page at documentation :frowning:

I didn’t know anything about inter-process communication before this. So using FMonitoredProcess as a start point, i tried to create mine. But there are some problems i can’t solve and some questions i have:
1-) The process is terminated about one second later. Happens when i attach WritePipe to process, no idea why.
2-) [DONE] How can i write through pipe so that the program will read it
2-b) Need to implement WriteToPipe for other platforms
3-) [NO IDEA] If i can use FPlatformNamedPipe with FPlatformProcess, how can i do that

This is what i have done till now

FInteractiveProcess header — Updated 2 times the header and source


// Copyright 1998-2015 Epic Games, Inc. All Rights Reserved.

#pragma once

#include "Runtime/Core/Public/GenericPlatform/GenericPlatformProcess.h"
#include "Runtime/Core/Public/Misc/Timespan.h"

/**
* Declares a delegate that is executed when a interactive process completed.
*
* The first parameter is the process return code.
*/
DECLARE_DELEGATE_OneParam(FOnInteractiveProcessCompleted, int32)

/**
* Declares a delegate that is executed when a interactive process produces output.
*
* The first parameter is the produced output.
*/
DECLARE_DELEGATE_OneParam(FOnInteractiveProcessOutput, FString)

/**
* Implements an external process that can be interacted.
*/
class CHESSGAME_API FInteractiveProcess
	: public FRunnable
{
public:

	/**
	* Creates a new interactive process.
	*
	* @param InURL The URL of the executable to launch.
	* @param InParams The command line parameters.
	* @param InHidden Whether the window of the process should be hidden.
	*/
	FInteractiveProcess(const FString& InURL, const FString& InParams, bool InHidden);

	/** Destructor. */
	~FInteractiveProcess();

	/**
	* Cancels the process.
	*
	* @param InKillTree Whether to kill the entire process tree when canceling this process.
	*/
	void Cancel(bool InKillTree = false)
	{
		bCanceling = true;
		bKillTree = InKillTree;
	}

	/**
	* Gets the duration of time that the task has been running.
	*
	* @return Time duration.
	*/
	FTimespan GetDuration() const;

	/**
	* Checks whether the process is still running.
	*
	* @return true if the process is running, false otherwise.
	*/
	bool IsRunning() const
	{
		return (Thread != nullptr);
	}

	/** 
	* Launches the process 
	* 
	* @return True if succeed
	*/
	bool Launch();

	/**
	* Returns a delegate that is executed when the process has been canceled.
	*
	* @return The delegate.
	*/
	FSimpleDelegate& OnCanceled()
	{
		return CanceledDelegate;
	}

	/**
	* Returns a delegate that is executed if the process is terminated without user wanting
	*
	* @return The delegate
	*/
	FSimpleDelegate& OnTerminated()
	{
		return TerminatedDelegate;
	}

	/**
	* Returns a delegate that is executed when a interactive process completed.
	*
	* @return The delegate.
	*/
	FOnInteractiveProcessCompleted& OnCompleted()
	{
		return CompletedDelegate;
	}

	/**
	* Returns a delegate that is executed when a interactive process produces output.
	*
	* @return The delegate.
	*/
	FOnInteractiveProcessOutput& OnOutput()
	{
		return OutputDelegate;
	}

	/**
	* Sends the message when process is ready
	* 
	* @param Message to be send
	*/
	void WriteWhenReady(const FString& Message);

	/**
	* Returns the return code from the exited process
	*
	* @return Process return code
	*/
	int GetReturnCode() const
	{
		return ReturnCode;
	}

	// FRunnable interface

	virtual bool Init() override
	{
		return true;
	}

	virtual uint32 Run() override;

	virtual void Stop() override
	{
		Cancel();
	}

	virtual void Exit() override { }

protected:

	/**
	* Processes the given output string.
	*
	* @param Output The output string to process.
	*/
	void ProcessOutput(const FString& Output);

	// Write message to process through pipe
	bool WriteToPipe();

private:

	// Whether the process is being canceled. */
	bool bCanceling : 1;

	// Whether the window of the process should be hidden. */
	bool bHidden : 1;

	// Whether to kill the entire process tree when cancelling this process. */
	bool bKillTree : 1;

	// Holds the URL of the executable to launch. */
	FString URL;

	// Holds the command line parameters. */
	FString Params;

	// Holds the handle to the process. */
	FProcHandle ProcessHandle;

	// Holds the read pipe. */
	void* ReadPipe;

	// Holds the write pipe. */
	void* WritePipe;

	// Holds the monitoring thread object. */
	FRunnableThread* Thread;

	// Holds the return code. */
	int ReturnCode;

	// Holds the time at which the process started. */
	FDateTime StartTime;

	// Holds the time at which the process ended. */
	FDateTime EndTime;

	// Message to be written to pipe when ready
	FString MessageToProcess;

	// Holds a delegate that is executed when the process has been canceled. */
	FSimpleDelegate CanceledDelegate;

	// Holds a delegate that is executed when the process has been canceled. */
	FSimpleDelegate TerminatedDelegate;

	// Holds a delegate that is executed when a interactive process completed. */
	FOnInteractiveProcessCompleted CompletedDelegate;

	// Holds a delegate that is executed when a interactive process produces output. */
	FOnInteractiveProcessOutput OutputDelegate;
};

FInteractiveProcess source


// Copyright 1998-2015 Epic Games, Inc. All Rights Reserved.

#include "ChessGame.h"
#include "FInteractiveProcess.h"
#include "Runtime/Core/Public/Misc/Paths.h"

FInteractiveProcess::FInteractiveProcess(const FString& InURL, const FString& InParams, bool InHidden)
	: bCanceling(false)
	, bHidden(InHidden)
	, bKillTree(false)
	, URL(InURL)
	, Params(InParams)
	, ReadPipe(nullptr)
	, WritePipe(nullptr)
	, Thread(nullptr)
	, ReturnCode(0)
	, StartTime(0)
	, EndTime(0)
{ }

FInteractiveProcess::~FInteractiveProcess()
{
	if (IsRunning())
	{
		Cancel(true);
		Thread->WaitForCompletion();
		delete Thread;
		Thread = nullptr;
	}
}

FTimespan FInteractiveProcess::GetDuration() const
{
	if (IsRunning())
	{
		return (FDateTime::UtcNow() - StartTime);
	}

	return (EndTime - StartTime);
}

bool FInteractiveProcess::Launch()
{
	if (IsRunning())
	{
		return false;
	}

	if (!FPlatformProcess::CreatePipe(ReadPipe, WritePipe))
	{
		return false;
	}

	ProcessHandle = FPlatformProcess::CreateProc(*URL, *Params, false, bHidden, bHidden, nullptr, 0, nullptr, WritePipe);

	if (!ProcessHandle.IsValid())
	{
		UE_LOG(LogTemp, Warning, TEXT("Failed to launch"));
		return false;
	}

	// Creating name for the process
	static uint32 tempInteractiveProcessIndex = 0;
	ThreadName = FString::Printf(TEXT("FInteractiveProcess %d"), tempInteractiveProcessIndex);
	tempInteractiveProcessIndex++;

	Thread = FRunnableThread::Create(this, *ThreadName);

	return true;
}

void FInteractiveProcess::ProcessOutput(const FString& Output)
{
	TArray<FString> LogLines;

	Output.ParseIntoArray(&LogLines, TEXT("
"), false);

	for (int32 LogIndex = 0; LogIndex < LogLines.Num(); ++LogIndex)
	{
		OutputDelegate.ExecuteIfBound(LogLines[LogIndex]);
	}
}

bool FInteractiveProcess::WriteToPipe()
{
	// If there is not a message
	if (MessageToProcess.Len() == 0)
	{
		return false;
	}
	
	if (!IsRunning())
	{
		return false;
	}

	if (!ProcessHandle.IsValid())
	{
		UE_LOG(LogTemp, Warning, TEXT("Process handle is not valid"));
		return false;
	}

	// Convert tempInput to UTF8CHAR
	uint32 BytesAvailable = MessageToProcess.Len();
	UTF8CHAR* Buffer = new UTF8CHAR[BytesAvailable + 1];
	if (!FString::ToBlob(MessageToProcess, Buffer, BytesAvailable))
	{
		UE_LOG(LogTemp, Warning, TEXT("Failed to convert UTF8CHAR"));
		return false;
	}

	// Empty original FString
	MessageToProcess.Empty(0);

	// Write pipe UTF8CHAR
	uint32 BytesWritten = 0;
	// @todo This doesn't works on any OS other than Windows
	if (!WriteFile(WritePipe, Buffer, BytesAvailable, (::DWORD*)&BytesWritten, nullptr))
	{
		UE_LOG(LogTemp, Warning, TEXT("Failed to write"));
		return false;
	}

	if (BytesAvailable > BytesWritten)
	{
		UE_LOG(LogTemp, Warning, TEXT("Failed to write all of the message"));
	}
	else
	{
		UE_LOG(LogTemp, Log, TEXT("Succesfully write through pipe"));
	}

	return true;
}

void FInteractiveProcess::WriteWhenReady(const FString& Message)
{
	MessageToProcess = Message;
}

// FRunnable interface
uint32 FInteractiveProcess::Run()
{
	// control and interact with the process
	StartTime = FDateTime::UtcNow();
	{
		bool IsProcRunning = false;
		do
		{
			// 1 millisecond sleep is a problem on Windows platform, dont change this
			FPlatformProcess::Sleep(0.0);

			// Read pipe
			ProcessOutput(FPlatformProcess::ReadPipe(ReadPipe));
			
			// Write pipe
			WriteToPipe();

			// If wanted to stop program
			if (bCanceling)
			{
				// close pipes
				FPlatformProcess::ClosePipe(ReadPipe, WritePipe);
				ReadPipe = WritePipe = nullptr;

				FPlatformProcess::TerminateProc(ProcessHandle, bKillTree);
				CanceledDelegate.ExecuteIfBound();

				// get completion status
				if (!FPlatformProcess::GetProcReturnCode(ProcessHandle, &ReturnCode))
				{
					ReturnCode = -1;
				}

				CompletedDelegate.ExecuteIfBound(ReturnCode);
			}

			IsProcRunning = FPlatformProcess::IsProcRunning(ProcessHandle);
		} while (IsProcRunning);
	}
	EndTime = FDateTime::UtcNow();
	UE_LOG(LogTemp, Warning, TEXT("%s terminated"), *ThreadName);

	// Means the process terminated without user wanting
	if (!bCanceling)
	{
		TerminatedDelegate.ExecuteIfBound();
	}

	return 0;
}

Doing do the same job with FMonitoredProcess also terminates program immediately :frowning:

And it is not because the program i am sure of it(It is a chess engine designed to communicate through pipes using UCI)

I am getting this error just before the process termination

LogStats:Warning: MetaData mismatch. Did you assign a stat to two groups? New //STATGROUP_Threads//FInteractiveProcess 0///Thread_1648_0///////STATCAT_Advanced//// old //STATGROUP_Threads//FInteractiveProcess 0///Thread_16fc_0///////STATCAT_Advanced////

But as you can see, i use a static integer and increment its value each time, so there shouldn’t be any threads with the same name. FMonitoredProcess also does the same. Also i am not launching the process more than once.

This is where i use it


// Fill out your copyright notice in the Description page of Project Settings.

#include "ChessGame.h"
#include "StockFishCommunication.h"


// Sets default values
AStockFishCommunication::AStockFishCommunication()
{
 	// Set this actor to call Tick() every frame.  You can turn this off to improve performance if you don't need it.
	PrimaryActorTick.bCanEverTick = true;

	FString tempPathToExecutable = FPaths::GameContentDir();
	tempPathToExecutable.Append(FString("stockfish-6-64.exe"));

	StockFishProcess = new FInteractiveProcess(tempPathToExecutable, FString(""), true);

}

AStockFishCommunication::~AStockFishCommunication()
{
	StockFishProcess->Stop();
	delete StockFishProcess;
}

// Called when the game starts or when spawned
void AStockFishCommunication::BeginPlay()
{
	Super::BeginPlay();
	
	StockFishProcess->Launch();

	StockFishProcess->OnOutput().BindUObject(this, &AStockFishCommunication::ShowMessageAtScreen);
	StockFishProcess->OnTerminated().BindUObject(this, &AStockFishCommunication::ShowTerminatedMessage);

	StockFishProcess->WriteWhenReady(FString("uci"));
}

// Called every frame
void AStockFishCommunication::Tick( float DeltaTime )
{
	Super::Tick( DeltaTime );

}

void AStockFishCommunication::ShowMessageAtScreen(const FString Message)
{
	GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Red, *Message);
}

void AStockFishCommunication::ShowTerminatedMessage()
{
	GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Red, TEXT("Process is terminated by an outsider"));
}

void AStockFishCommunication::ShowCompletedMessage()
{
	GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Red, TEXT("Process is completed"));
}

Ok, at least i confirmed that the problem, i mentioned in my previous post, was nothing to worry about and not the cause why my process gets terminated. Still don’t know what is the cause

Please guys i need help seriously.

At least i now know why the process is terminated. Because the thread i create for the process is terminated. Which results in termination of process. Should not forget unreal is responsible for that thread.

Here is something that shows what happens the moment i press play in the editor.

Why the thread terminates, i have no idea :frowning:

Finally get even more close to the problem.

The process and thread terminates when i attach a write pipe to process, otherwise everything is fine.

Of course without write pipe, it is useless

Can anyone give me a hand with this? :frowning:

Hello Corpse!

This is a very interesting problem! :smiley: I’ve personally done a lot of IPC, but I don’t think I’ve ever seen this error before.
I don’t think I have the time to help you debug, but I just thought I’d suggest a side step solution.
Is there any particular reason you want to use pipes? I’m guessing this is a performance concern, but I would still really recommend using sockets anyways if you aren’t absolutely sure performance will be an issue. Sockets are much easier to handle than pipes and are more versatile if you decide to change your architecture down the line.

If you really want to go for the super performant route but feel you are stuck, you can always check out a shared memory implementation instead, or some other solution like a memory mapped file.

Here is a really lightweight one if you want to try:

Sorry I couldn’t be of more help with your actual question at the moment :slight_smile:

Best regards,
Temaran

After 7 days somebody answered… My tears are coming off now.

Going to look at these options right now

^_^;;

Hope it works out <3

There is a really nice Socket implementation in the engine btw, I tried it a couple of months back and got it up and running in a couple of hours or so.
I even have the test project lying around somewhere if you want the source. It’s kind of woven in with some sensitive code, but I can probably extract it if you want me to >_>.

/Temaran

No need, thanks anyways

This is the first time i am working with IPC.

Can a console program handle pipes or sockets(Chess engine, stockfish). I mean i think it would since all these uses input-output logic, but this pipe problem made me question many things.

EDIT: Ok, this is a silly question :slight_smile: I started to doubt myself too much.

Hello!

There is nothing that prevents a console program from neither sockets nor pipes.
The easiest way to set it up if you want to access the data from a console app, you would just poll the API in some kind of loop and sleep between each try if you need to get data continuously, or if you just need to read once, you can do that during startup if you have a guaranteed process order, or when the user enters a command through stdin or the like.

If you want to use winsock or something like that, it would look something like this:




#include "resource.h"
#pragma comment (lib, "Ws2_32.lib")

#define DEFAULT_BUFLEN 512
#define DEFAULT_PORT 54243

//... STUFF

	SOCKET s;
	struct sockaddr_in server, si_other;
	int slen, recv_len;
	char buf[DEFAULT_BUFLEN];
	WSADATA wsa;

	slen = sizeof(si_other);

	if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0)
	{
		printf("Failed. Error Code : %d", WSAGetLastError());
		exit(EXIT_FAILURE);
	}

	if ((s = socket(AF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET)
	{
		printf("Could not create socket : %d", WSAGetLastError());
	}
	printf("Socket created.
");

	//Prepare the sockaddr_in structure
	server.sin_family = AF_INET;
	server.sin_addr.s_addr = INADDR_ANY;
	server.sin_port = htons(DEFAULT_PORT);

	//Bind
	if (bind(s, (struct sockaddr *)&server, sizeof(server)) == SOCKET_ERROR)
	{
		printf("Bind failed with error code : %d", WSAGetLastError());
		exit(EXIT_FAILURE);
	}
	puts("Bind done");

	printf("Waiting for data...");
	fflush(stdout);

	//clear the buffer by filling null, it might have previously received data
	memset(buf, '\0', DEFAULT_BUFLEN);

while(true)
{
if ((recv_len = recvfrom(s, buf, DEFAULT_BUFLEN, 0, (struct sockaddr *) &si_other, &slen)) == SOCKET_ERROR)
	{
		printf("recvfrom() failed with error code : %d", WSAGetLastError());
		exit(EXIT_FAILURE);
	}
}

//... STUFF



I just took some copypasta from one of my projects out of context, so not sure if it will run like that… might miss some variable declarations or whatnot.

Best regards,
Temaran

Can’t tell how thankful i am :slight_smile:

@Temaran I used socket to receive data from another end(based on UDP).I create a thread to constantly receive large amount of data.When data comes ,i will notify the blueprint to handle it.However,the program always crashed or stopped notify the blueprint after running for a while.I used delegate mechanism to notify the program.Can you help me?The following is part of the code:
MyActor.h:
class MYPROJECT617_API AMyActor : public AActor
{
GENERATED_BODY()

public:
// Sets default values for this actor’s properties
AMyActor();

// Called when the game starts or when spawned
virtual void BeginPlay() override;

// Called every frame
virtual void Tick( float DeltaSeconds ) override;

    DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FNewDataComes, FString, locationData);
UPROPERTY(BlueprintAssignable, Category = "Socket")
FNewDataComes OnDataUpdated;

};

MyActor.cpp
I create a thread in a function:
LPDWORD dwThreadID = 0;
m_hListenThread = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)StartReceiveData, this, 0, dwThreadID);

DWORD32 WINAPI StartReceiveData(LPVOID myparam)
{
AMyActor* pDlg = (AMyActor*)myparam;
if (pDlg == NULL)
return 0;
dwSendSize = sizeof(SOCKADDR_IN);
while (true)
{
char* pszRecv = new char[4096];
nRet = recvfrom(sockClient, pszRecv, 4096, 0, (SOCKADDR*)&siRemote, &(dwSendSize));
if (nRet == SOCKET_ERROR) {
cout << "recvfrom Error " << WSAGetLastError() << endl;
FString a;

		a.AppendInt(WSAGetLastError());
		//a.Append(inet_ntoa(siRemote.sin_addr));
		delete]  pszRecv;
		return -1;
	}
	else if (nRet == 0) {
		 delete]  pszRecv;
	 }
	else{
		pszRecv[nRet] = '\0';
		cout &lt;&lt; inet_ntoa(siRemote.sin_addr) &lt;&lt; endl
			&lt;&lt; pszRecv &lt;&lt; endl;

		pDlg-&gt;OnDataUpdated.Broadcast(pszRecv);
		 delete]  pszRecv;
	}
}
return 0;

}

Then in my level:

I want create process in my project, use createproc can run .exe, but i cant close it. use closeproc no help.

FInteractiveProcess and FMonitoredProcess that also test, but i dont know why it killed immediately.

so i will do.