接下来的一个步骤是打开WorkerRole.cs文件,添加相应代码来从Queue存储器检索消息,进行计算,并更新Table存储器中的内容。为此,需要先将WorkerRole.cs文件中的Start方法添加如下所示的代码。
WorkerRole.cs
using Microsoft.Samples.ServiceHosting.StorageClient;
// ...
public override void Start()
{
RoleManager.WriteToLog("Information", "Worker Process entry point called");
// Create account info objects for retrieving messages and accessing table storage
StorageAccountInfo accountTableInfo = StorageAccountInfo.GetDefaultTableStorageAccountFromConfiguration();
StorageAccountInfo accountQueueInfo = StorageAccountInfo.GetDefaultQueueStorageAccountFromConfiguration();
CalculationDataServiceContext calcDSContext = new CalculationDataServiceContext(accountTableInfo);
QueueStorage qs = QueueStorage.Create(accountQueueInfo);
MessageQueue queue = qs.GetQueue("calculations");
while (true)
{
Thread.Sleep(10000);
if (queue.DoesQueueExist())
{
Message msg = queue.GetMessage();
if (msg != null)
{
// Retrieve the Calculation entity from Table storage
var calc = (from c in calcDSContext.Calculations
where c.PartitionKey == "TEST"
&& c.RowKey == msg.ContentAsString()
select c).FirstOrDefault();
if (calc != null)
{
calc.CalcValue = calc.ValueOne + calc.ValueTwo;
// Update the entity with the new CalcValue
// property populated
calcDSContext.UpdateObject(calc);
calcDSContext.SaveChanges(SaveChangesOptions.ReplaceOnUpdate);
// Need to delete the message once processed!
queue.DeleteMessage(msg);
}
}
}
}
}
using Microsoft.Samples.ServiceHosting.StorageClient;
// ...
public override void Start()
{
RoleManager.WriteToLog("Information", "Worker Process entry point called");
// Create account info objects for retrieving messages and accessing table storage
StorageAccountInfo accountTableInfo = StorageAccountInfo.GetDefaultTableStorageAccountFromConfiguration();
StorageAccountInfo accountQueueInfo = StorageAccountInfo.GetDefaultQueueStorageAccountFromConfiguration();
CalculationDataServiceContext calcDSContext = new CalculationDataServiceContext(accountTableInfo);
QueueStorage qs = QueueStorage.Create(accountQueueInfo);
MessageQueue queue = qs.GetQueue("calculations");
while (true)
{
Thread.Sleep(10000);
if (queue.DoesQueueExist())
{
Message msg = queue.GetMessage();
if (msg != null)
{
// Retrieve the Calculation entity from Table storage
var calc = (from c in calcDSContext.Calculations
where c.PartitionKey == "TEST"
&& c.RowKey == msg.ContentAsString()
select c).FirstOrDefault();
if (calc != null)
{
calc.CalcValue = calc.ValueOne + calc.ValueTwo;
// Update the entity with the new CalcValue
// property populated
calcDSContext.UpdateObject(calc);
calcDSContext.SaveChanges(SaveChangesOptions.ReplaceOnUpdate);
// Need to delete the message once processed!
queue.DeleteMessage(msg);
}
}
}
}
}
现在,我们已经获得了一个具备一定功能的Worker Role了,即每隔10秒它会检查一次“calculation”队列中的消息。当它发现一个消息时,它会从Table存储器检索相应的计算实体,更新计算结果,并将新的结果写回Table存储器。最后一件事情,也是最重要的一件事情是从队列中删除消息,以防止该信息被重复索引。