EOS: verify user identity in game backend REST API

Hi, in my game users login via Epic Online Subsystem, granting my game access to information about their epic games account. After that they are able to use EOS sessions, lobbies, etc.

Now I want the game to communicate with the progression backend, written in python. With it’s requests to this API, the game would need to send user id and some kind of token, which could prove (with a call to EOS API) that this user is really logged in via EOS.

eg

import requests

# On backend we received these values with the HTTP request from the game
received_user_id = "1234"
received_user_token = "this is a real token"

r = requests.post("https://eos-api.com/validate_user_token", json={"id": received_user_id, "token": received_user_token})
is_authenticated = r.json()["is_authenticated"]
# If is_authenticated, proceed with the API call execution, otherwise throw 401

I guess that [FUserOnlineAccount::GetAccessToken()](
GetAccessToken()
is used to obtain token in game, and FOnlineUser::GetUserId is used to obtain id, but how do I validate them on server, is there any documentation about it and examples?

This page contains documentation for validating access tokens, either online

POST /epic/oauth/v2/tokenInfo HTTP/1.1
Host: api.epicgames.dev
Content-Type: application/x-www-form-urlencoded
token=mytoken

or offline, validating signature in the JWT, which is the access token itself.

https://dev.epicgames.com/docs/web-api-ref/authentication

Here is a Python class to verify EOS client id and access token on backend.

"""
Checks EOS authentication token according to the documentation:
https://dev.epicgames.com/docs/epic-account-services/auth/auth-interface
Validating ID Tokens on Backend Without SDK

To retrieve epic account id with EOS in UE, use
IOnlineSubsystem::Get()->GetIdentityInterface()->GetUniquePlayerId(0).ToString()
then extract part before "|".

To retrieve auth token in UE, use
IOnlineSubsystem::Get()->GetIdentityInterface()->GetAuthToken(0)
"""

import time
import traceback

import requests
import jwt
import json
import os
import sys


class EOSAuthVerifier:
    def __init__(self):
        self.public_keys = {}

        if os.path.exists("/tmp/"):
            self.eos_keys_fp = "/tmp/eos_keys.json"
        else:
            self.eos_keys_fp = ""

        if os.path.exists(self.eos_keys_fp):
            with open(self.eos_keys_fp, "r") as f:
                latest_key_data_cache = json.load(f)
        else:
            latest_key_data_cache = {
                "keys": [
                    {
                        "kty": "RSA",
                        "e": "AQAB",
                        "kid": "WMS7EnkIGpcH9DGZsv2WcY9xsuFnZCtxZjj4Ahb-_8E",
                        "n": "l6XI48ujknQQlsJgpGXg4l2i_DuUxuG2GXTzkOG7UtX4MqkVBCfW1t1JIIc8q0kCInC2oBwhC599ZCmd-cOi0kS7Aquv68fjERIRK9oCUnF_lJg296jV8xcalFY0FOWX--qX3xGKL33VjJBMIrIu7ETjj06s-v4li22CnHmu2lDkrp_FPTVzFscn-XRIojqIFb7pKRFPt27m12FNE_Rd9bqlVCkvMNuE7VTpTOrSfKk5B01M5IuXKXk0pTAWnelqaD9bHjAExe2I_183lp_uFhNN4hLTjOojxl-dK8Jy2OCPEAsg5rs9Lwttp3zZ--y0sM7UttN2dE0w3F2f352MNQ"
                    }
                ]
            }

        self.__refresh_keys_from_key_data(latest_key_data_cache)

        self.issuer_start_string = "https://api.epicgames.dev"
        # Set environmental variable according to your EOS app Client Id
        # If you did set up EOS correctly, it will be in Project Settings, Plugins, Online Subsystem EOS, Artifacts
        self.client_id = os.getenv("EOS_CLIENT_ID")

    def __refresh_keys_from_key_data(self, key_data):
        self.public_keys = {}
        for jwk in key_data["keys"]:
            kid = jwk['kid']
            self.public_keys[kid] = jwt.algorithms.RSAAlgorithm.from_jwk(json.dumps(jwk))

    def __drop_cached_keys_and_get_latest(self):
        r = requests.get("https://api.epicgames.dev/epic/oauth/v2/.well-known/jwks.json")
        latest_key_data = r.json()

        if self.eos_keys_fp:
            with open(self.eos_keys_fp, "w") as f:
                json.dump(latest_key_data, f)
        self.__refresh_keys_from_key_data(latest_key_data)

    def validate_token(self, account_id, token):
        try:
            header_data = jwt.get_unverified_header(token)

            alg = header_data.get("alg")
            kid = header_data.get("kid")

            # Step 1: alg is present and not None
            if not alg:
                raise ValueError("Alg header is None")
            if not kid:
                raise ValueError("Kid header is None")

            if kid not in self.public_keys:
                self.__drop_cached_keys_and_get_latest()
                if kid not in self.public_keys:
                    raise ValueError(f"Kid {kid} not found in keys")

            public_key = self.public_keys[kid]
            # Here it checks key (step 2), expiration (step 5), audience (step 6)
            payload = jwt.decode(
                token,
                key=public_key,
                algorithms=[header_data['alg']],
                audience=self.client_id,
            )

            # Check issuer (step 3)
            issuer = payload.get("iss", "")
            if not issuer.startswith(self.issuer_start_string):
                raise ValueError(f"Bad issuer: {issuer}")

            # Check that iat is in the past (step 4)
            iat = payload.get("iat")
            if not isinstance(iat, int):
                raise ValueError(f"Non-number iat: {iat}")
            if not (iat < time.time()):
                raise ValueError(f"Iat is not before current time: {iat}")

            # Check that account in token is same as specified in request
            token_account_id = payload.get("sub", "")
            if account_id != token_account_id:
                raise ValueError(f"Account id {account_id} didn't match with token account id {token_account_id}")

            return True

        except Exception as e:
            print(traceback.format_exc())
            return False


if __name__ == '__main__':
    s = EOSAuthVerifier()
    with open("../eos_token_example.txt", "r") as f:
        token = f.read()
    with open("../eos_token_account_id.txt", "r") as f:
        account_id = f.read()
    print(s.validate_token(account_id, token))