Когда будет запущен метод ProcessEventsAsync (контекст PartitionContext, сообщения ienumerable‹EventData›)

В настоящее время я работаю над Интернетом вещей, в моем текущем проекте я создал проект One Azure Cloud Service, в котором я создал рабочую роль, внутри рабочей роли я написал ниже строки кода.

 public class WorkerRole : RoleEntryPoint
{
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);

    private static string connectionString;
    private static string eventHubName;
    public static ServiceClient iotHubServiceClient { get; private set; }
    public static EventHubClient eventHubClient { get; private set; }

    public override void Run()
    {
        Trace.TraceInformation("EventsForwarding Run()...\n");

        try
        {
            this.RunAsync(this.cancellationTokenSource.Token).Wait();
        }
        finally
        {
            this.runCompleteEvent.Set();
        }
    }

    public override bool OnStart()
    {
        // Set the maximum number of concurrent connections
        ServicePointManager.DefaultConnectionLimit = 12;

        // For information on handling configuration changes
        // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

        bool result = base.OnStart();

        Trace.TraceInformation("EventsForwarding OnStart()...\n");

        connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
        eventHubName = ConfigurationManager.AppSettings["Microsoft.ServiceBus.EventHubName"];

        string storageAccountName = ConfigurationManager.AppSettings["AzureStorage.AccountName"];
        string storageAccountKey = ConfigurationManager.AppSettings["AzureStorage.Key"];
        string storageAccountString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}",
            storageAccountName, storageAccountKey);

        string iotHubConnectionString = ConfigurationManager.AppSettings["AzureIoTHub.ConnectionString"];
        iotHubServiceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString);
        eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);

        var defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup();

        string eventProcessorHostName = "SensorEventProcessor";
        EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, defaultConsumerGroup.GroupName, connectionString, storageAccountString);
        eventProcessorHost.RegisterEventProcessorAsync<SensorEventProcessor>().Wait();

        Trace.TraceInformation("Receiving events...\n");

        return result;
    }

    public override void OnStop()
    {
        Trace.TraceInformation("EventsForwarding is OnStop()...");

        this.cancellationTokenSource.Cancel();
        this.runCompleteEvent.WaitOne();

        base.OnStop();

        Trace.TraceInformation("EventsForwarding has stopped");
    }

    private async Task RunAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            //Trace.TraceInformation("EventsToCommmandsService running...\n");
            await Task.Delay(1000);

        }
    }
}

Затем я написал следующие строки кода в SensorEventProcessor для получения сообщений из концентратора событий и отправки этих сообщений в концентратор IoT.

class SensorEventProcessor : IEventProcessor
{
    Stopwatch checkpointStopWatch;
    PartitionContext partitionContext;

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        Trace.TraceInformation(string.Format("EventProcessor Shuting Down.  Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString()));
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        Trace.TraceInformation(string.Format("Initializing EventProcessor: Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset));
        this.partitionContext = context;
        this.checkpointStopWatch = new Stopwatch();
        this.checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        Trace.TraceInformation("\n");
        Trace.TraceInformation("........ProcessEventsAsync........");
        //string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}";
        //await WorkerRole.iotHubServiceClient.SendAsync("astranidevice", new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
        foreach (EventData eventData in messages)
        {
            try
            {
                string jsonString = Encoding.UTF8.GetString(eventData.GetBytes());

                Trace.TraceInformation(string.Format("Message received at '{0}'. Partition: '{1}'",
                    eventData.EnqueuedTimeUtc.ToLocalTime(), this.partitionContext.Lease.PartitionId));

                Trace.TraceInformation(string.Format("-->Raw Data: '{0}'", jsonString));

                SimpleTemperatureAlertData newSensorEvent = this.DeserializeEventData(jsonString);

                Trace.TraceInformation(string.Format("-->Serialized Data: '{0}', '{1}', '{2}', '{3}', '{4}'",
                    newSensorEvent.Time, newSensorEvent.RoomTemp, newSensorEvent.RoomPressure, newSensorEvent.RoomAlt, newSensorEvent.DeviceId));

                // Issuing alarm to device.
                string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}";
                Trace.TraceInformation("Issuing alarm to device: '{0}', from sensor: '{1}'", newSensorEvent.DeviceId, newSensorEvent.RoomTemp);
                Trace.TraceInformation("New Command Parameter: '{0}'", commandParameterNew);
                await WorkerRole.iotHubServiceClient.SendAsync(newSensorEvent.DeviceId, new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew)));
            }
            catch (Exception ex)
            {
                Trace.TraceInformation("Error in ProssEventsAsync -- {0}\n", ex.Message);
            }
        }

        await context.CheckpointAsync();
    }
    private SimpleTemperatureAlertData DeserializeEventData(string eventDataString)
    {
        return JsonConvert.DeserializeObject<SimpleTemperatureAlertData>(eventDataString);
    }

}

Когда я отлаживал свой код, метод ProcessEventsAsync (контекст PartitionContext, сообщения IEnumerable) никогда не вызывался и просто входил в метод OpenAsync(), после чего прекращал отладку.

Пожалуйста, скажите мне, где я сделал ошибку в своем проекте, и скажите мне, когда вызовет метод ProcessEventsAsync().

С уважением,

Прадип


person Pradeep    schedule 06.05.2016    source источник


Ответы (1)


IEventProcessor.ProcessEventsAsync вызывается при наличии необработанных сообщений в EventHub.

Концентратор событий содержит несколько разделов. Раздел — это упорядоченная последовательность событий. Внутри раздела каждое событие включает смещение. Это смещение используется потребителями (IEventProcessor) для отображения местоположения в последовательности событий для данного раздела. Когда IEventProcessor подключается (EventProcessorHost.RegisterEventProcessorAsync), он передает это смещение в концентратор событий, чтобы указать расположение, с которого следует начать чтение. При наличии необработанных сообщений (событий с более высоким смещением) они доставляются в IEventProcessor. Контрольные точки используются для сохранения смещения обработанных сообщений (PartitionContext.CheckpointAsync).

Вы можете найти подробную информацию о внутреннем устройстве EventHub: Azure Event Hubs обзор

Отправляли ли вы какие-либо сообщения в EventHub (EventHubClient.SendAsync(EventData))?

person Attila Cseh    schedule 07.05.2016
comment
да, но не используя метод SendAsync() для отправки сообщений в концентратор событий. в моем требовании я установил выходное задание как концентратор событий в потоковой аналитике, потоковая аналитика отправит значения в концентратор событий, в этом я использовал ввод в качестве концентратора IoT. - person Pradeep; 09.05.2016
comment
Не могли бы вы подтвердить через портал Azure, что EventHub недавно получил сообщения? - person Attila Cseh; 09.05.2016
comment
Я думаю, что он получен, но на панели инструментов моего лазурного портала не отображаются сообщения концентратора событий. - person Pradeep; 09.05.2016
comment
Если вы не видите никаких входящих сообщений на информационной панели EventHub, проверьте входящие события на информационной панели Stream Analytics. Я подозреваю, что события вообще не отправляются. - person Attila Cseh; 09.05.2016
comment
спасибо за ваше руководство, я сделал ошибку в запросе потоковой аналитики. - person Pradeep; 10.05.2016