创建消息队列和添加到消息的代码:
1: StorageAccountInfo account = StorageAccountInfo.GetDefaultQueueStorageAccountFromConfiguration();
2:
3: QueueSvc = QueueStorage.Create(account);
4:
5: queue = QueueSvc.GetQueue("mymessage090120");
6:
7: if (!queue.DoesQueueExist())
8:
9: {
10:
11: queue.CreateQueue();
12:
13: }
14:
15: if (queue != null)
16:
17: {
18:
19: Message msg = new Message(InputBox.Text);
20:
21: queue.PutMessage(msg);
22:
23: InputBox.Text = "";
24:
25: }
26:
2:
3: QueueSvc = QueueStorage.Create(account);
4:
5: queue = QueueSvc.GetQueue("mymessage090120");
6:
7: if (!queue.DoesQueueExist())
8:
9: {
10:
11: queue.CreateQueue();
12:
13: }
14:
15: if (queue != null)
16:
17: {
18:
19: Message msg = new Message(InputBox.Text);
20:
21: queue.PutMessage(msg);
22:
23: InputBox.Text = "";
24:
25: }
26:
刷新功能,主要是使用PeekMessages方法来读取消息队列:
1: MsgBox.Items.Clear();
2:
3: IEnumerable<Message> msgs = queue.PeekMessages(10);
4:
5: if (msgs != null)
6:
7: {
8:
9: foreach (Message item in msgs)
10:
11: {
12:
13: MsgBox.Items.Add(item.ContentAsString());
14:
15: }
16:
17: }
2:
3: IEnumerable<Message> msgs = queue.PeekMessages(10);
4:
5: if (msgs != null)
6:
7: {
8:
9: foreach (Message item in msgs)
10:
11: {
12:
13: MsgBox.Items.Add(item.ContentAsString());
14:
15: }
16:
17: }
Worker Role的处理消息的代码:
1: public override void Start()
2:
3: {
4:
5: // initialize the account information
6:
7: Uri baseUri = new Uri(RoleManager.GetConfigurationSetting("QueueStorageEndpoint"));
8:
9: string accountName = RoleManager.GetConfigurationSetting("AccountName");
10:
11: string accountKey = RoleManager.GetConfigurationSetting("AccountSharedKey");
12:
13: StorageAccountInfo account = new StorageAccountInfo(
14:
15: baseUri,
16:
17: null,
18:
19: accountName,
20:
21: accountKey);
22:
23: //retrieve a reference to the messages queue.
24:
25: QueueStorage service = QueueStorage.Create(account);
26:
27: MessageQueue queue = service.GetQueue("");
28:
29: while (true)
30:
31: {
32:
33: Thread.Sleep(10000);
34:
35: if (queue.DoesQueueExist())
36:
37: {
38:
39: Message msg = queue.GetMessage();
40:
41: if (msg != null)
42:
43: {
44:
45: RoleManager.WriteToLog("Information",
46:
47: string.Format("Message '{0}' processed.", msg.ContentAsString()));
48:
49: queue.DeleteMessage(msg);
50:
51: }
52:
53: }
54:
55: }
56:
57: }
58:
2:
3: {
4:
5: // initialize the account information
6:
7: Uri baseUri = new Uri(RoleManager.GetConfigurationSetting("QueueStorageEndpoint"));
8:
9: string accountName = RoleManager.GetConfigurationSetting("AccountName");
10:
11: string accountKey = RoleManager.GetConfigurationSetting("AccountSharedKey");
12:
13: StorageAccountInfo account = new StorageAccountInfo(
14:
15: baseUri,
16:
17: null,
18:
19: accountName,
20:
21: accountKey);
22:
23: //retrieve a reference to the messages queue.
24:
25: QueueStorage service = QueueStorage.Create(account);
26:
27: MessageQueue queue = service.GetQueue("");
28:
29: while (true)
30:
31: {
32:
33: Thread.Sleep(10000);
34:
35: if (queue.DoesQueueExist())
36:
37: {
38:
39: Message msg = queue.GetMessage();
40:
41: if (msg != null)
42:
43: {
44:
45: RoleManager.WriteToLog("Information",
46:
47: string.Format("Message '{0}' processed.", msg.ContentAsString()));
48:
49: queue.DeleteMessage(msg);
50:
51: }
52:
53: }
54:
55: }
56:
57: }
58: