什麼是發布-訂閱 (Publish & Subscribe)

發布-訂閱模式允許服務之間使用 Message 進行溝通,常用在微服務之間的通訊、高併發環境...等。生產者將 Message 發送至 Topic,並且不知道哪個應用程式會接收 Message 並進行處理;消費者將訂閱該 Topic ,並且不知道誰產生了 Message,只負責接收 Message 並進行處理。中間的消息代理者 (Message broker) 則負責將生產的訊息傳送至有訂閱的消費者。




Dapr Pub-Sub 運作原理

Dapr Pub-Sub building block 提供了抽象平台的 API 來發布與接收 Message,你的應用程式將會將消息發布至一個 Named Topic,並且進行訂閱來消費 Message。在 Dapr 架構下,應用程式是透過 Sidecar 方式使用 Pub-Sub 元件。Pub-Sub 元件內封裝了一個 Message Broken 服務,Dapr 協助您的應用程式進行訂閱此 Pub-Sub,並在收到 Message 時將其發送至應用程式。

Dapr 保證 Message 採用 at-least-once,這意味每個訂閱者至少收到一次



您可以在 C:\Users\<您的使用者帳戶>\.dapr\components 找到 pubsub.yaml,可以看見預設使用 Redis 作為 Pub-Sub 元件





Dapr 發布-訂閱格式為  CloudEvents 1.0 ,所以透過 Dapr 應用程式發送的 Message 會自動打包成  Cloud Events 格式,datacontenttype 属性使用 Content-Type。範例如下






訂閱主題

Dapr 有兩種訂閱主題 (擇一即可)

  1. 宣告方式 (Declaratively)
  2. 程式方式 (Programmatically)


宣告方式在 C:\Users\<您的使用者帳戶>\.dapr\components 內建立 subscription.yaml,定義訂閱主題:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: topicStatus
route: /tstatus
pubsubname: pubsub
scopes:
- client



以上面範例進行說明:

topic: 主題為 topicStatus

pubsubname: 對應的 pubsub 元件,這裡設定為 pubsub

route: 收到訊息時會將主題發送至應用程式的 /tstatus

scope: 表示 app-id 為 client 的應用程式啟用訂閱



下列式以程式方式(C#) 方式訂閱,我們會將此程式放在 controller,主要重點在於 topic 內需要 pubsubname 與 topic、 route 的設定。

[Topic("pubsub","topicStatus")]
[HttpPost("tstatus")]
public async Task<ActionResult> Sub()
{
Stream stream = Request.Body;
byte[] buffer = new byte[Request.ContentLength.Value];
stream.Position = 0L;
stream.ReadAsync(buffer, 0, buffer.Length);
string content = Encoding.UTF8.GetString(buffer);
_logger.LogInformation("topicStatus" + content);
return Ok(content);
}
view raw subpub.cs hosted with ❤ by GitHub


並且在 Startup.cs 內 Configure 內加入 app.UseCloudEvents(); ,並找到 app.UseEndpoints,在內部加上 endpoints.MapSubscribeHandler();






實作 Pub & Sub

此時做事延續 Dapr 服務調用 (Service invocation) 進階篇,建議您先完成


步驟 1. 右鍵點選 Controller > 點選加入 > 控制器 > 選擇空白的 API 控制器







步驟 2. 名稱輸入 PubSubController





步驟 3. 貼上下列程式碼,並進行建置 (Build)

using Dapr;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System.IO;
using System.Text;
using System.Threading.Tasks;
namespace DaprClientDemo.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class PubSubController : ControllerBase
{
private readonly ILogger<PubSubController> _logger;
public PubSubController(ILogger<PubSubController> logger)
{
_logger = logger;
}
[Topic("pubsub","topicStatus")]
[HttpPost("tstatus")]
public async Task<ActionResult> Sub()
{
Stream stream = Request.Body;
byte[] buffer = new byte[Request.ContentLength.Value];
stream.Position = 0L;
stream.ReadAsync(buffer, 0, buffer.Length);
string content = Encoding.UTF8.GetString(buffer);
_logger.LogInformation("topicStatus" + content);
return Ok(content);
}
}
}




步驟 4. 以管理者身分開啟命令提示字元,輸入指令

dapr run --app-id client --app-port 5200 --dapr-http-port 3501 -- dotnet DaprClientDemo\bin\Debug\netcoreapp3.1\DaprClientDemo.dll --urls=http://localhost:5200




步驟 5. 我們也可以透過 dapr CLI 來發布消息,另開命令提示字元,輸入下列指令

dapr publish --publish-app-id client --pubsub pubsub --topic topicStatus --data '{"date":"0001-01-01T00:00:00","test0":0,"test1":1,"test2":2}'





步驟 6. 可以從啟動 Client 服務的視窗中看見 log,表示有收到 Message