Реализация AWS Appsync с использованием клиентской библиотеки GraphQL в .Net

Я пытаюсь реализовать подписку на синхронизацию приложений, аналогичную этому примеру Python, но в .net https://aws.amazon.com/blogs/mobile/appsync-websockets-python/

Я начал это с помощью пакета nuget GraphQL.Client https://www.nuget.org/packages/GraphQL.Client Выполнение Query/Mutation работает нормально, как указано в файле readme https://github.com/graphql-dotnet/graphql-client Но подписка не работает.

Мой код с использованием GraphQL.Client:

using var graphQLClient = new GraphQLHttpClient("https://<MY-API-PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql", new NewtonsoftJsonSerializer());

 graphQLClient.HttpClient.DefaultRequestHeaders.Add("host", "<API HOST without https or absolute path and 'realtime-' text in the api address>"); //As given in the python example

graphQLClient.HttpClient.DefaultRequestHeaders.Add("x-api-key", "<API KEY>");
var req= new GraphQLRequest
{
    Query = @"subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){  content }}",
    Variables = new{}
};

IObservable<GraphQLResponse<Response>> subscriptionStream = graphQLClient.CreateSubscriptionStream<Response>(req, (Exception ex) =>
{
      Console.WriteLine("Error: {0}", ex.ToString());
});

var subscription = subscriptionStream.Subscribe(response =>
{
                Console.WriteLine($"Response'{Newtonsoft.Json.JsonConvert.SerializeObject(response)}' ");
},
ex =>
{
Console.WriteLine("Error{0}", ex.ToString());
});

Это дает исключение «Удаленная сторона закрыла соединение WebSocket, не завершив рукопожатие».

трассировка стека:

в System.Net.WebSockets.ManagedWebSocket.d__662.MoveNext() at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter1.GetResult() в GraphQL.Client.Http.Websocket.GraphQLHttpWebSocket.d__40.MoveNext() в C:\Users\UserName\Source\repos\graphql-client\src\GraphQL. Клиент\Websocket\GraphQLHttpWebSocket.cs:строка 546

Затем я попробовал без этого nuget и с использованием стандартного веб-сокета

Код без nuget:

static public async Task CallWebsocket()
        {
            try
            {
                _client = new ClientWebSocket();
                _client.Options.AddSubProtocol("graphql-ws");
                _client.Options.SetRequestHeader("host", "<HOST URL without wss but now with 'realtime' text in api url because otherwise we are getting SSL error>");
                _client.Options.SetRequestHeader("x-api-key", "<API KEY>");

                await _client.ConnectAsync(new Uri("https://<MY-APPSYNC_API_PATH>.appsync-realtime-api.<AWS-region>.amazonaws.com/graphql"), CancellationToken.None);
                await SendCommand();
                var docList = await Receive();
            }
            catch(Exception ex)
            {

            }
        }

       static  private async Task SendCommand()
        {
            ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes("'query' : 'subscription SubscribeToEventComments{ subscribeToEventComments(eventId: 'test'){  content }}'"));
            await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
        }
        static private async Task<string> Receive()
        {
            var receiveBufferSize = 1536;
            byte[] buffer = new byte[receiveBufferSize];
            var result = await _client.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            var resultJson = (new UTF8Encoding()).GetString(buffer);
            return resultJson;
        }

Я получаю исключение ниже:

Внутреннее исключение: «Установленное соединение было прервано программным обеспечением на вашем хост-компьютере».

Внутреннее сообщение об исключении: «Невозможно прочитать данные из транспортного соединения: установленное соединение было прервано программным обеспечением на вашем хост-компьютере».

Сообщение: «Удаленная сторона закрыла соединение WebSocket, не завершив подтверждение закрытия».

Может кто поможет с правильной реализацией.


person Jack    schedule 31.05.2020    source источник


Ответы (2)


Nuget не будет работать из коробки с подписками AppSync, поэтому вам нужно будет написать для этого собственный клиентский код, как вы пытались во втором (не Nuget) примере.

Теперь, что касается второго примера, еще раз взгляните на пример Python упоминается в вашем вопросе. Есть несколько шагов, которые не включены в ваш код. Я перечислю необходимые шаги и попытаюсь перенести их на C# из кода Python (обратите внимание, что у меня нет под рукой среды C#, поэтому могут быть синтаксические ошибки, но этот код должен быть довольно близок к тому, что вам нужно)

Шаг 0. Конечные точки AppSync

Предположим, что результатом вызова aws appsync get-graphql-api --api-id example123456 для вашего API является:

{
    "graphqlApi": {
        "name": "myNewRealTimeGraphQL-API",
        "authenticationType": "<API_KEY>",
        "tags": {},
        "apiId": "example123456",
        "uris": {
            "GRAPHQL": "https://abc.appsync-api.us-west-2.amazonaws.com/graphql",
            "REALTIME": "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql"
        },
        "arn": "arn:aws:appsync:us-west-2: xxxxxxxxxxxx:apis/xxxxxxxxxxxx"
    }
}

Шаг 1. Создайте URL-адрес подключения

Шаг 2. Подключитесь к конечной точке WebSocket

Это включает в себя отправку сообщения connection_init в соответствии с протоколом, упомянутым в статье о python.

Шаг 3. Дождитесь подключения connection_ack согласно протоколу.

Опять же, это согласно протоколу

Шаг 4 - Зарегистрируйте подписку

Шаг 5 - Отправить мутацию

Этого шага нет в этом ответе, но его можно выполнить через консоль AWS

Шаг 6 - Дождитесь сообщений с данными

Это события в реальном времени, отправляемые AppSync.

Шаг 7 - Отмените регистрацию подписки

Шаг 8 - Отключить

// These are declared at the same level as your _client

// This comes from the graphqlApi.uris.GRAPHQL in step 0, set as a var here for clarity
_gqlHost  = "abc.appsync-api.us-west-2.amazonaws.com";

// This comes from the graphqlApi.uris.REALTIME in step 0, set as a var here for clarity
_realtimeUri = "wss://abc.appsync-realtime-api.us-west-2.amazonaws.com/graphql";

_apiKey = "<API KEY>";

static public async Task CallWebsocket()
{
    
    // Step 1
    // This is JSON needed by the server, it will be converted to base64
    // (note: might be better to use something like Json.NET for this task)
    var header = var test = $@"{{
        ""host"":""{_gqlHost}"",
        ""x-api-key"": ""{_apiKey}""
    }}";

    // Now we need to encode the previous JSON to base64
    var headerB64 = System.Convert.ToBase64String(
        System.Text.Encoding.UTF8.GetBytes(header));

    UriBuilder connectionUriBuilder = new UriBuilder(_realtimeUri);
    connectionUriBuilder.Query = $"header={headerB64}&payload=e30=";
    
    try
    {
        _client = new ClientWebSocket();
        _client.Options.AddSubProtocol("graphql-ws");

        // Step 2
        await _client.ConnectAsync(connectionUriBuilder.Uri), CancellationToken.None);
        // Step 3
        await SendConnectionInit();
        await Receive();
    }
    catch(Exception ex)
    {

    }
}

static  private async Task SendConnectionInit()
{
    ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(@"{""type"": ""connection_init""}"));
    await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}

static  private async Task SendSubscription()
{
    // This detail is important, note that the subscription is a stringified JSON that will be embeded in the "data" field below
    var subscription = $@"{{\""query\"": \""subscription SubscribeToEventComments{{ subscribeToEventComments{{ content }} }}\"", \""variables\"": {{}} }}";
    
    var register = $@"{{
            ""id"": ""<SUB_ID>"",
            ""payload"": {{
                ""data"": ""{subscription}"",
                ""extensions"": {{
                    ""authorization"": {{
                        ""host"": ""{_gqlHost}"",
                        ""x-api-key"":""{_apiKey}""
                    }}
                }}
            }},
            ""type"": ""start""
        }}";
        
    // The output should look like below, note again the "data" field contains a stringified JSON that represents the subscription 
    /*
    {
        "id": "<SUB_ID>",
        "payload": {
            "data": "{\"query\": \"subscription SubscribeToEventComments{ subscribeToEventComments{ content}}\", \"variables\": {} }",
            "extensions": {
                "authorization": {
                    "host": "abc.appsync-api.us-west-2.amazonaws.com",
                    "x-api-key":"<API KEY>"
                }
            }
        },
        "type": "start"
    }
    */

    ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(register));
    await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}

static  private async Task Deregister()
{
    var deregister = $@"{{
                            ""type"": ""stop"",
                            ""id"": ""<SUB_ID>""
                        }}"
    ArraySegment<byte> outputBuffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(deregister));
    await _client.SendAsync(outputBuffer, WebSocketMessageType.Text, true, CancellationToken.None);
}

static private async Task Receive()
{
    while (_socket.State == WebSocketState.Open)
    {
        ArraySegment<Byte> buffer = new ArraySegment<byte>(new Byte[8192]);
        WebSocketReceiveResult result= null;
        using (var ms = new MemoryStream())
        {
            // This loop is needed because the server might send chunks of data that need to be assembled by the client
            // see: https://stackoverflow.com/questions/23773407/a-websockets-receiveasync-method-does-not-await-the-entire-message
            do
            {
                result = await socket.ReceiveAsync(buffer, CancellationToken.None);
                ms.Write(buffer.Array, buffer.Offset, result.Count);
            }
            while (!result.EndOfMessage);

            ms.Seek(0, SeekOrigin.Begin);

            using (var reader = new StreamReader(ms, Encoding.UTF8))
            {
                // convert stream to string
                var message = reader.ReadToEnd();
                Console.WriteLine(message)
                // quick and dirty way to check response
                if (message.Contains("connection_ack"))
                {
                    // Step 4
                    await SendSubscription();
                } else if (message.Contains("data"))  // Step 6
                {
                    // Step 7 
                    await Deregister();
                    // Step 8
                    await _client.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
                }
            }
        }
    }
}

person Angel Velazquez    schedule 02.06.2020
comment
Спасибо, что нашли время и помогли мне. За последние два дня я снова написал код с нуля, используя эту документацию docs.aws.amazon.com/appsync/latest/devguide/. Соединения Websocket, подтверждение и обработка данных работают. Но когда я вижу ваш код, я вижу несколько стандартов кодирования, которые я пропустил в том, как я обрабатываю отмену регистрации и отключение. Я исправлю их. - person Jack; 02.06.2020

Для тех, кто сталкивается с такими же проблемами, я создал пакет nuget. https://www.nuget.org/packages/DotNetCSharp.AWS.AppSync.Client/1.1.1 Вы можете использовать его, как показано ниже.

//Create Client Specify eithen APIKey or AuthToken
var Client = new AppSyncClient("<Appsync URL>", new AuthOptions()
{
// APIKey = "<API Key>",
AuthToken = "<JWT Token>"
});

//To Subscribe an query
Guid newId = Guid.NewGuid();
await Client.CreateSubscriptionAsync<Message>(new QueryOptions()
{
Query = "subscription <Subscription Query>",
SubscriptionId = newId
},
(data) =>
{

});

//To unsubscribe an subscription
await Client.UnSubscribe(newId);

//To close the websocket
await Client.Close();
person Jack    schedule 02.06.2020