技术开发 频道

F#异步及并行模式中的轻量级代理

在F#中,异步代理的独立性直接表现为文法上的作用域。例如,下面的代码使用一个字典来累计发送至代理对象的不同消息的次数。内部的字典在文法上是异步代理私有的,因此我们无法在代理外部对字典进行读写。这意味着字典的可变状态实际上是被隔离的,代理保证了对它的非并发的安全访问。

type Agent<'T> = MailboxProcessor<'T> open System.Collections.Generic let agent = Agent.Start(fun inbox -> async { let strings = Dictionary() while true do let! msg = inbox.Receive() if strings.ContainsKey msg then strings.[msg] <- strings.[msg] + 1 else strings.[msg] <- 0 printfn "message '%s' now seen '%d' times" msg strings.[msg] } )

 

  状态隔离是F#的基本特性,它并不是代理所独有的。例如,下面的代码对StreamReader和ResizeArray(这是F#中对System.Collections.Generics.List的别称)的隔离访问。请注意这段代码和.NET类库中的System.IO.File.ReadAllLines方法功能相同:

let readAllLines (file:string) = use reader = new System.IO.StreamReader(file) let lines = ResizeArray<_>() while not reader.EndOfStream do lines.Add (reader.ReadLine()) lines.ToArray()

在这里,reader和ResizeArray对象都无法在函数外部使用。在代理或其他持续计算的情况里,隔离性是至关重要的──状态在这里永远独立于程序运行中的其他部分。

  说到底,隔离性是个动态的属性,它经常受到文法的约束。例如,考虑这样一个延迟的,随需加载的读取器,它会读取文件中的所有行:

let readAllLines (file:string) = seq { use reader = new System.IO.StreamReader(file) while not reader.EndOfStream do yield reader.ReadLine() }

隔离状态经常包含可变的值,包括F#中的引用单元。例如,下面的代码在一个引用单元中不断累计接受到的消息个数:

let agent = Agent.Start(fun inbox -> async { let count = ref 0 while true do let! msg = inbox.Receive() incr count printfn "now seen a total of '%d' messages" !count } )

 再次强调,这里可变的状态被隔离了,确保了对它单线程的安全访问。

  在代理中使用循环和隔离状态(函数式)

  F#程序员了解两种实现循环的方法:使用“命令式”的while/for以及可变的累加器(ref或mutable),或是“函数式”风格的调用,将累加状态作为参数传递给一个或多个递归函数。例如,计算文件行数的程序可以使用命令式的方式来写:

let countLines (file:string) = use reader = new System.IO.StreamReader(file) let count = ref 0 while not reader.EndOfStream do reader.ReadLine() |> ignore incr count !count或是递归式的: let countLines (file:string) = use reader = new System.IO.StreamReader(file) let rec loop n = if reader.EndOfStream then n else reader.ReadLine() |> ignore loop (n+1)

loop 0 在使用时,两种方式都是可行的:函数式的做法相对更加高级一些,但是大大减少了代码中显式的状态修改次数,且更为通用。两种写法对于F#程序员来说,一般都可以理解,他们还可以将“while”循环转化为等价的“let rec”函数(这是个不错的面试问题!)。

  有趣的是,在编写异步循环时的规则也一样:您可以使用命令式的“while”或函数式的“let rec”中的任意一种来定义循环。例如,这里有一个利用递归的异步函数统计消息数量的做法:

let agent = Agent.Start(fun inbox -> let rec loop n = async { let! msg = inbox.Receive() printfn "now seen a total of %d messages" (n+1) return! loop (n+1) } loop 0

这样我们便可以获得这样的输出:

now seen a total of 0 messages

  now seen a total of
1 messages

  ....

  now seen a total of
10000 messages

    再提一次,定义代理对象的两种常见模式为命令式的:

let agent = Agent.Start(fun inbox -> async { // isolated imperative state goes here ... while do // read messages and respond ... }) 及函数式的:

  let agent
= Agent.Start(fun inbox -> let rec loop arg1 arg2 = async { // receive and process messages here ... return! loop newArg1 newArg2 } loop initialArg1 initialArg2)

再次强调,两种方法在F#都是合理的──使用递归的异步函数可能是更高级的方法,但更为函数式且更为通用。

  查看原文

0
相关文章