using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net.Http; using System.Net.Http.Headers; using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; using Newtonsoft.Json; namespace MQWebCore { public class MQHelper { string URL = " http://publictest-rest.ons.aliyun.com "; string topic, secretKey, accessKey; public MQHelper( string topic, string secretKey, string accessKey) { this.topic = topic; this.secretKey = secretKey; this.accessKey = accessKey; } /// <summary> /// URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可 /// </summary> /// <param name="tag"></param> /// <param name="key"></param> /// <param name="body"></param> /// <returns></returns> public async Task< bool> Pub( string tag, string key, string body) { using (HttpClient httpClient = new HttpClient()) { httpClient.DefaultRequestHeaders.Connection.Add( " keep-alive "); HttpContent content = new StringContent(body, Encoding.UTF8); httpClient.DefaultRequestHeaders.Accept.Add( new MediaTypeWithQualityHeaderValue( " text/html ")); var time = ( long)(DateTime.Now.ToUniversalTime() - new DateTime( 1970, 1, 1)).TotalMilliseconds; var signString = Sign( string.Format( " {0}\nPID_{0}\n{1}\n{2} ", topic, MD5Encrypt(body), time), secretKey); httpClient.DefaultRequestHeaders.Add( " AccessKey ", accessKey); httpClient.DefaultRequestHeaders.Add( " Signature ", signString); httpClient.DefaultRequestHeaders.Add( " ProducerID ", string.Format( " PID_{0} ", topic)); var url = URL + " /message/?topic= " + topic + " &time= " + time + " &tag= " + tag + " &key= " + key; var res = await httpClient.PostAsync(url, content); if (res.StatusCode == System.Net.HttpStatusCode.Created) { return true; } return false; } } public async void Subscribe( string tag = " * ") { using (HttpClient httpClient = new HttpClient()) { httpClient.DefaultRequestHeaders.Connection.Add( " keep-alive "); httpClient.DefaultRequestHeaders.Add( " Accept-Charset ", " utf-8 "); var time = ( long)(DateTime.Now.ToUniversalTime() - new DateTime( 1970, 1, 1)).TotalMilliseconds; var signString = Sign( string.Format( " {0}\nCID_{0}\n{1} ", topic, time), secretKey); httpClient.DefaultRequestHeaders.Add( " AccessKey ", accessKey); httpClient.DefaultRequestHeaders.Add( " Signature ", signString); httpClient.DefaultRequestHeaders.Add( " ConsumerID ", string.Format( " CID_{0} ", topic)); var url = URL + " /message/?topic= " + topic + " &time= " + time + " &num=32 "; var res = httpClient.GetAsync(url).GetAwaiter().GetResult(); Console.WriteLine(res.StatusCode); if (res.StatusCode == System.Net.HttpStatusCode.OK) { var msg = await res.Content.ReadAsStringAsync(); Console.WriteLine(msg); if (msg != null && msg.Length > 10) { MQMessage[] mqMsgs = JsonConvert.DeserializeObject<MQMessage[]>(msg); foreach ( var mqMsg in mqMsgs) { Delete(mqMsg.msgHandle); } } } } } async void Delete( string msgHandle) { using (HttpClient httpClient = new HttpClient()) { httpClient.DefaultRequestHeaders.Accept.Add( new MediaTypeWithQualityHeaderValue( " text/html ")); var time = ( long)(DateTime.Now.ToUniversalTime() - new DateTime( 1970, 1, 1)).TotalMilliseconds; var signString = Sign( string.Format( " {0}\nCID_{0}\n{1}\n{2} ", topic, msgHandle, time), secretKey); httpClient.DefaultRequestHeaders.Add( " AccessKey ", accessKey); httpClient.DefaultRequestHeaders.Add( " Signature ", signString); httpClient.DefaultRequestHeaders.Add( " ConsumerID ", string.Format( " CID_{0} ", topic)); var url = URL + " /message/?topic= " + topic + " &time= " + time + " &msgHandle= " + msgHandle; var res = await httpClient.DeleteAsync(url); if (res.StatusCode == System.Net.HttpStatusCode.NoContent) { Console.WriteLine( " 消息删除成功,无需返回内容 "); } else { Console.WriteLine(res.StatusCode); } } } string MD5Encrypt( string strText) { using ( var md5 = MD5.Create()) { var result = md5.ComputeHash(Encoding.UTF8.GetBytes(strText)); return BitConverter.ToString(result).Replace( " - ", "").ToLower(); } } string Sign( string signatureString, string secretKey, bool isRaw = true) { var enc = Encoding.UTF8; HMACSHA1 hmac = new HMACSHA1(enc.GetBytes(secretKey)); hmac.Initialize(); byte[] buffer = enc.GetBytes(signatureString); if (isRaw) { byte[] ret = hmac.ComputeHash(buffer); return Convert.ToBase64String(ret); } else { string res = BitConverter.ToString(hmac.ComputeHash(buffer)).Replace( " - ", "").ToLower(); return Convert.ToBase64String(Encoding.UTF8.GetBytes(res)); } } } public class MQMessage { public string body; public string bornTime; public string msgHandle; public string msgId; public long reconsumeTimes; public string tag; }
使用:
using MQWebCore; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { public class Program { public static void Main( string[] args) { Console.OutputEncoding = System.Text.Encoding.UTF8; // Encoding.RegisterProvider(CodePagesEncodingProvider.Instance); MQHelper mqHelper = new MQHelper( " Test ", "3412qsd's12 ", "3412341212 "); var res = mqHelper.Pub("testTag", "testKey", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss") + "阿特斯地方").GetAwaiter().GetResult(); Debug.WriteLine(res); while ( true) { mqHelper.Subscribe(); Thread.Sleep( 1000); } Console.Read(); } } }