前言

另一個新的 C# 8.0 的語法為非同步資料流 (Asynchronous streams),除了允許非同步方法回傳多筆資料,也提供資料接收者控制非同步資料來源一個好的方式,以強化程式響應 (responsiveness) 能力。本篇文章將以簡單範例說明非同步資料流使用方法,若有錯誤或任何建議,請各位先進不吝提出。




介紹

在提到非同步資料流之前,我們先簡單談一下 Pull programming model 與 Push programming model 兩種模組,以及適合的情境。以下圖為例,分別為 使用者要求資料 與 推送給使用者資料兩種模式:


使用者(Consumer) 請求取得資料模式 (上圖) 較適用於 資料來源 (Producer) 生成資料速度較快,使用者處理速度較慢的情境;推送給使用者資料模式較適用於 Producer 生成資料速度較慢,Consumer 處理速度較快的情境。而非同步資料流的部分為後者。

我們撰寫一隻 Producer 程式,每 0.5 秒會推回多筆員工資料給 Consumer,程式碼如下。
using System.Collections.Generic;
using System.Threading.Tasks;

namespace AsynchronousStreamExample
{
    class EmployeeInfo 
    {
        public int Id { get; set; }
    }

    class Producer
    {
        public async IAsyncEnumerable<EmployeeInfo> GetEmployeeInfosAsync(int count)
        {
            for (int i = 0; i < count; i++)
            {
                await Task.Delay(500);
                yield return new EmployeeInfo(){Id = i +1};
            }
        }
    }
}


在 C# 8.0 我們可以透過非同步資料流方式接收資料(如 17 行)。另外,透過 第 9 行 ThreadId 也可以檢視非同步的情況。程式碼如下:
using System;
using System.Threading;
using System.Threading.Tasks;

namespace AsynchronousStreamExample
{
    class Program
    {
        private static int ThreadId => Thread.CurrentThread.ManagedThreadId;

        static async Task Main(string[] args)
        {
            var producer = new Producer();
            
            Console.WriteLine($"({ThreadId})取得員工資料");

            await foreach (var employee in producer.GetEmployeeInfosAsync(20))
            {
                Console.WriteLine($"({ThreadId})開始接收資料: {employee.Id}");
            }

            Console.WriteLine($"({ThreadId})接收資料完畢");
        }
    }
}


執行結果如下圖,每 0.5 秒推送一筆員工資料。由此得知,非同步資料流是相當不錯推送技術。

若您對於這個飯粒有興趣,你可以在我的 GitHub 找到 Asynchronous streams example (.NET Core 3.0)。

參考資料

1. Async Streams in C# 8 -infoQ