Monday, April 26, 2010

A Distributed Hash Table (DHT) in F#: Recursion and Pattern Matching

This post is part of a series on the development of a distributed hash table (DHT) in F#.

You can find the previous posts at the following locations:
1. .NET Remoting in F#: A Start to a Distributed Hash Table
2. A Distributed Hash Table (DHT) in F#: Moving  from .NET Remoting to WCF
3. A Distributed Hash Table (DHT) in F#: Using Higher Order Functions for the Service Operation Calls

Joining the Node Cluster:

The simplest implementation defined in the Chord Protocol requires that each node only know about its successor node.  The circle of nodes, joined by each successor node association, makes up the node cluster. In order for a node to become part of the node cluster, it must execute a join request to any arbitrary node in the cluster.

The code that provides this functionality is provided below.  Note: The code base is getting too large to display in full.  The complete solution can be found at http://github.com/dmohl/Polyphony.

Part of ChordClient.fs
let rec FindSuccessorNode possiblePredecessorNode startingNode localNode (chordServerProxy:IChordServerProxy) =
    let valueOption = chordServerProxy.CallServer possiblePredecessorNode CommandType.Join [|localNode|]
    match valueOption with
    | Some value -> 
        let nodeNeighbors = value :?> NodeNeighbors
        match nodeNeighbors.PredecessorNode with
        | predecessorNode when possiblePredecessorNode = predecessorNode -> nodeNeighbors.SuccessorNode
        | _ when startingNode = nodeNeighbors.SuccessorNode -> 
            chordServerProxy.CallServer possiblePredecessorNode CommandType.UpdateSuccessorNode [|localNode|] |> ignore
            nodeNeighbors.SuccessorNode
        | _ when startingNode = "" ->
            FindSuccessorNode nodeNeighbors.SuccessorNode possiblePredecessorNode localNode chordServerProxy
        | _ -> FindSuccessorNode nodeNeighbors.SuccessorNode startingNode localNode chordServerProxy
    | None -> localNode

let JoinChordNodeNetwork localNode remoteNode (chordServerProxy:IChordServerProxy) =
    let successorNode = FindSuccessorNode remoteNode "" localNode chordServerProxy
    chordServerProxy.CallServer localNode 
        CommandType.UpdateSuccessorNode [|successorNode|] |> ignore
    successorNode    

Pattern Matching and Recursion:

Pattern matching and recursion are the two primary language features that are being used here.

Matt Podwysocki has a nice overview of pattern matching here.  Within that post he states:
"One of the interesting and more powerful features of the F# language is Pattern Matching.  Don't be tempted to think of them as simple switch statements as they are much more powerful than that.  Pattern matching is used to test whether the piece under test has a desired state, find relevant information, or substitute parts with other parts.  Most functional languages such as Haskell, ML, and OCaml have them, and F# is no exception to its support."  
Recursion and pattern matching often go hand in hand.  Recursion basically allows you to repeatedly call a function from within that same function.

In the code above, the function FindSuccessorNode is designated as a recursive function by the "rec" keyword.  A call is then made to a node to see if the two nodes should be associated.  The result of that call is then pattern matched against possible outcomes.  Depending on the match, the FindSuccessorNode is recursively called or the successor node is returned.


Part of ChordServer class in ChordServer.fs
        member x.RequestJoinChordNodeNetwork requestorNode =
            let result = 
                match requestorNode with
                | _ when requestorNode = x.node || x.node = x.successorNode ->
                    x.successorNode <- requestorNode 
                    BuildNodeNeighbors x.node x.node
                | _ when requestorNode > x.node && requestorNode < x.successorNode -> 
                    let requestorsSuccessor = x.successorNode 
                    x.successorNode <- requestorNode
                    BuildNodeNeighbors x.node requestorsSuccessor
                | _ -> 
                    BuildNodeNeighbors x.successorNode x.successorNode
            x.AddToFingerNodes requestorNode        
            result

Once again, pattern matching is the primary language feature being used.

Conclusion:

Pattern matching and recursion are two extremely powerful weapons in the functional programmer's arsenal. When combined, the possibilities are endless.

Sunday, April 18, 2010

Interacting with RabbitMQ via F# and Symbiote

Last week, I showed how easy it is to talk to CouchDB with F# and Symbiote.  In this post, I'll show how you can start interacting with RabbitMQ by adding an additional dozen or so lines of code.

RabbitMQ:
RabbitMQ is a complete and highly reliable enterprise messaging system based on the emerging AMQP standard. It is licensed under the open source Mozilla Public License and has a platform-neutral distribution, plus platform-specific packages and bundles for easy installation. (http://www.rabbitmq.com/)
Symbiote:

Symbiote is a set of libraries that reduce the radius of comprehension by providing simplified APIs "...to some of the better open source libraries available" as well as "...a way to centralize the configuration [and] dependency injection..." of said libraries (Robson, A., Symbiote).   

For those of you who haven't played with RabbitMQ and/or Symbiote, I strongly recommend checking them out!

Code:

Here's the code with the CouchDB interaction functionality as well as the RabbitMQ publish/subscribe functionality.

A couple of things to note about this sample:
1. CouchDB and RabbitMQ must be installed and running on the local machine.
2. Since Symbiote is under heavy development, the CouchDB related code has changed slightly from the example provided last week.
module Progam

open System
open Symbiote.Core
open Symbiote.Relax
open Symbiote.Daemon
open Symbiote.Jackalope
open Symbiote.Jackalope.Impl

type PersonCouchDocument =
    val id : string
    val mutable name : string
    val mutable address : string
    inherit DefaultCouchDocument 
        new (id, name, address) = 
            {id = id; name = name; address = address} then base.DocumentId <- id
        member x.Name
            with get () = x.name
        member x.Address
            with get () = x.address

type HandleMessage() =
    interface IMessageHandler<PersonCouchDocument> with
        member x.Process (message:PersonCouchDocument, messageDelivery:IMessageDelivery) =
            Console.WriteLine("Processing message for person {0}.", message.name)
            messageDelivery.Acknowledge()

type DemoService = 
    val couchServer : ICouchServer
    val bus : IBus
    new(couchServer, bus) as this = 
        {couchServer = couchServer; bus = bus} then this.Initialize()
    member public x.Initialize () =
        x.bus.AddEndPoint(fun x -> 
            x.Exchange("SymbioteDemo", ExchangeType.fanout).QueueName("SymbioteDemo")
                .Durable().PersistentDelivery() |> ignore)
    interface IDaemon with
        member x.Start () =
            do Console.WriteLine("The service has started")
            x.bus.Subscribe("SymbioteDemo", null)
            let document = new PersonCouchDocument("123456", "John Doe", "123 Main")
            x.couchServer.Repository.Save(document)
            x.bus.Send("SymbioteDemo", document)
            let documentRetrieved = 
                x.couchServer.Repository.Get<PersonCouchDocument>(document.DocumentId);
            do Console.WriteLine(
                "The document with name {0} and address {1} was retrieved successfully", 
                documentRetrieved.Name, documentRetrieved.Address)
            do x.couchServer.DeleteDatabase<PersonCouchDocument>()
        member x.Stop () =
            do Console.WriteLine("The service has stopped")

do Assimilate
    .Core()
    .Daemon(fun x -> (x.Name("FSharpDemoService")
                                    .DisplayName("An FSharp Demo Service")
                                    .Description("An FSharp Demo Service")
                                    .Arguments([||]) |> ignore))
    .Relax(fun x -> x.Server("localhost") |> ignore) 
    .Jackalope(fun x -> x.AddServer(fun s -> s.Address("localhost").AMQP08() |> ignore) |> ignore)
    .RunDaemon() 


Conclusion:

As mentioned previously, with the help of Symbiote it only takes a dozen or so additional lines of code to add support for RabbitMQ to our previous example.  The complete source for this solution can be found at http://github.com/dmohl/Symbiote-Jackalope-Example.


Thursday, April 15, 2010

Talking to CouchDB via F# and Symbiote

One of the cooler technologies that I have playing with lately is CouchDB.
"Apache CouchDB is a document-oriented database that can be queried and indexed in a MapReduce fashion using JavaScript. CouchDB also offers incremental replication with bi-directional conflict detection and resolution" (http://couchdb.apache.org/).  
Alex Robson recently announced a set of libraries that, among other things, makes talking with CouchDB from a .NET language very easy. Here's an F# example (the complete source can be found at http://github.com/dmohl/Symbiote--Relax--Example):
module Progam

open System
open Symbiote.Core
open Symbiote.Relax
open Symbiote.Daemon

type PersonCouchDocument =
    val id : string
    val mutable name : string
    val mutable address : string
    inherit DefaultCouchDocument 
        new (id, name, address) = 
            {id = id; name = name; address = address} then base.DocumentId <- id
        member x.Name
            with get () = x.name
        member x.Address
            with get () = x.address

type DemoService = 
    val couchServer : ICouchServer
    new(couchServer) = {couchServer = couchServer}
    interface IDaemon with
        member x.Start () =
            do Console.WriteLine("The service has started")
            let document = new PersonCouchDocument("123456", "John Doe", "123 Main")
            x.couchServer.Repository.Save(document)
            let documentRetrieved = 
                x.couchServer.Repository.Get<PersonCouchDocument>(document.DocumentId);
            do Console.WriteLine(
                "The document with name {0} and address {1} was retrieved successfully", 
                documentRetrieved.Name, documentRetrieved.Address)
            do x.couchServer.DeleteDatabase<PersonCouchDocument>()
        member x.Stop () =
            do Console.WriteLine("The service has stopped")

do Assimilate
    .Core()
    .Daemon(fun x -> (x.Name("FSharpDemoService")
                                    .DisplayName("An FSharp Demo Service")
                                    .Description("An FSharp Demo Service")
                                    .Arguments([||]) |> ignore))
    .Relax(fun x -> x.Server("localhost") |> ignore) 
    .RunDaemon() 
As you can see, interacting with CouchDB via F# and Symbiote is a snap.  Symbiote has a large number of additional features and advantages. To learn more about it, go to http://sharplearningcurve.com/wiki/Symbiote-Main.ashx.


Sunday, April 11, 2010

A Distributed Hash Table (DHT) in F#: Using Higher Order Functions for the Service Operation Calls

In my last post, I provided a start to a distributed hash table in F# using WCF.  One of the major limitations of that example was that only two nodes could ever be on the node network at a given time.  Over the last week, I added the functionality to allow additional nodes to dynamically join the node network.  Among other things, this involved adding a few additional operations to the service.  These additions quickly showed that the original implementation of calls to the service was not maintainable (not to mention a serious violation of the Single Responsibility Principle (SRP)).  To resolve this, the channel factory and service operation calls have been split into separate modules.

At first, I had each service operation call in a separate function that was called from the RunCommand function.  I then realized that higher order functions would make this so much easier!  The final result is shown below:

ChordServerProxy.fs
module ChordServerProxy

open System
open System.ServiceModel
open System.Runtime.Serialization
open ChordCommon
open ChordServerFacade
open ChordContracts

type IChordServerProxy = interface   
    abstract CallServer : server:string -> operationContract:CommandType -> inputArguments:string[] -> obj option
end

type ChordServerProxy() = 
    interface IChordServerProxy with
        member this.CallServer server operationContract inputArguments =
            let binding = new NetTcpBinding()
            let service = new ChannelFactory<IChordServer>(binding, server)  
            try                    
                try
                    let proxy = service.CreateChannel()
                    RunCommand proxy operationContract inputArguments
                with
                | ex -> 
                    None
            finally
                match service.State with
                | serviceState when serviceState <> CommunicationState.Faulted -> 
                    try
                        service.Close |> ignore
                    with
                    | ex ->
                        service.Abort |> ignore
                | _ -> service.Abort |> ignore 
ChordServerFacade.fs
module ChordServerFacade

open System
open System.ServiceModel
open ChordContracts
open ChordCommon

let Execute command proxy inputArguments : obj option =
    match command proxy inputArguments with
    | null -> None
    | value -> Some(value)

let RunCommand proxy operationContract inputArguments : obj option =
    let command =
        match operationContract with
        | CommandType.Put -> fun (proxy:IChordServer) (inputArguments:string[]) -> 
                                (proxy.PutValueByKey inputArguments.[1] inputArguments.[2])
        | CommandType.Get -> fun (proxy:IChordServer) (inputArguments:string[]) -> 
                                (proxy.GetValueByKey inputArguments.[1])
        | CommandType.Join -> fun (proxy:IChordServer) (inputArguments:string[]) -> 
                                (proxy.RequestJoinChordNodeNetwork inputArguments.[0] :> obj)
        | CommandType.UpdateSuccessorNode -> 
                              fun (proxy:IChordServer) (inputArguments:string[]) -> 
                                (proxy.UpdateSuccessorNode inputArguments.[0] :> obj)
        | CommandType.GetSuccessorNode -> fun (proxy:IChordServer) (inputArguments:string[]) -> 
                                (proxy.GetSuccessorNode () :> obj)
    Execute command proxy inputArguments

As always, you can get the complete source code from http://github.com/dmohl/Polyphony.


Sunday, April 4, 2010

A Distributed Hash Table (DHT) in F#: Moving from .NET Remoting to WCF

In my last post I provided a start to a distributed hash table (DHT) using F#, the Chord protocol, and .NET remoting.  In this post, I'll show how that code has changed due to a migration from .NET remoting to WCF.

As a side note, I've codenamed this little project Polyphony.

Here's a summary of the primary changes:

- Chord.fs has gone away.
- ChordServer now contains contract specific information.
- ChordClient now calls the service operations.

The primary code changes are provided below.  The complete project is available at http://github.com/dmohl/polyphony.

ChordServer.fs
module ChordServer

open System
open System.ServiceModel
open System.Collections
open System.Configuration
open System.Net

[<ServiceContract>]  
type IChordServer = interface   
    [<OperationContract>]  
    abstract PutValueByKey : key:obj -> value:obj -> unit  
    [<OperationContract>]  
    abstract GetValueByKey : value:obj -> obj  
end

[<ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)>]
type ChordServer = class
    val hashTable : Hashtable
    new () = {hashTable = new Hashtable()}
    interface IChordServer with
        member x.PutValueByKey key value =
            x.hashTable.Add(key, value)
        member x.GetValueByKey key =
            x.hashTable.Item(key)
end

let logException (ex:Exception) =
    Console.WriteLine("Error: {0}", ex.Message)
    Console.WriteLine(ex.Source)
    
let Initialize () =
    try
        let localServer = ConfigurationManager.AppSettings.Item("LocalServer")
        Console.WriteLine("Starting Server: {0}", localServer)
        let host = new ServiceHost(typeof<ChordServer>)
        host.AddServiceEndpoint(typeof<IChordServer>,
                    new NetTcpBinding(), localServer) |> ignore       
        host.Open()
        Some(host)
    with
    | ex -> logException ex
            None
           
let Stop (host: ServiceHost option)  =
    try
        match host with
        | None -> ()
        | Some(host) ->
            if host.State <> CommunicationState.Closed then
                host.Close()
                Console.WriteLine("Stopping Server")               
    with
    | ex -> logException ex           
ChordClient.fs
module ChordClient

open System
open System.Configuration
open System.ServiceModel

let remoteServer = ConfigurationManager.AppSettings.Item("RemoteServer")
let localServer = ConfigurationManager.AppSettings.Item("LocalServer")

let CallServer (server:string) (operationContract:string) (inputArguments:string[]) =
    let service = new ChannelFactory<ChordServer.IChordServer>(
                        new NetTcpBinding(), server)  
    try                    
        try
            let proxy = service.CreateChannel()        
            let result = match operationContract with
                         | "put" ->
                             proxy.PutValueByKey inputArguments.[1] inputArguments.[2] 
                             "Put Complete" :> obj
                         | "get" -> 
                             proxy.GetValueByKey inputArguments.[1]     
                         | _ -> "Unknown" :> obj 
            result             
        with
        | ex -> 
            Console.WriteLine ex.Message
            "Unknown" :> obj
    finally                 
        service.Close |> ignore

let RunCommand(input:string) : unit =
    let inputArguments = input.Split(' ')
    let result = 
        match inputArguments.[0] with
        | "put" ->
            CallServer localServer inputArguments.[0] inputArguments |> ignore
            sprintf "PUT Key:%A Value:%A" inputArguments.[1] inputArguments.[2] :> obj   
        | "get" -> 
            let rec getValue server =
                let value = CallServer server inputArguments.[0] inputArguments
                match value with
                | null -> getValue remoteServer    
                | _ -> value
            getValue localServer
        | _ -> "unknown command" :> obj   
    Console.WriteLine(result) |> ignore

Thursday, April 1, 2010

.NET Remoting in F#: A Start to a Distributed Hash Table (DHT)

Over the last few weeks I've been talking with a few guys on my team (@a_robson@therealhoff@elijahmanor@ifandelse) about Distributed Hash Tables (DHT).  A DHT seems like a fun thing to build that would be a good fit for F#.  


I decided to look at starting down the path of implementing the Chord protocol (http://pdos.csail.mit.edu/papers/chord:sigcomm01/chord_sigcomm.pdf).  The first problem that needed to be solved was that of communication between the nodes.  The ideal solution seems to be the model provided by Erlang.  Unfortunately, that model doesn't seem to be available in .NET land (there is MPI.NET, but that appears to require Windows HPC Server 2008 or Microsoft Compute Cluster Server 2003 for use across multiple machines).  After some research, I finally decided on using .NET remoting (though I may end up changing this to a sockets based approach in the future).  The POC for this is provided below.  


Seeing it work: 


1. First, run Polyphony.exe from the bin\Server directory. 
2. Next, run Polyphony.exe from the bin\Debug directory.




3. Now, type "put 1 test1" in one of the consoles and "put 2 test2" in the other.



4. Now, type "get 1" in either console.  The app. will first check it's local hash table.  If the value is not found, it moves on to the second node.






The next step is to make this scale and more robust.  Then it's on to some of the specific Chord protocol functionality.  The source is available at http://github.com/dmohl/Polyphony and will be continually evolving.  Let me know if you want to participate in the fun.


Example:

Program.fs
module Polyphony

open System
open System.Configuration

let port = ConfigurationManager.AppSettings.Item("ServerPort")

ChordServer.Initialize(Convert.ToInt32(port)) |> ignore
ChordClient.Initialize() |> ignore

Console.Write "\nEnter Command:"
let mutable input = Console.ReadLine()
 
while input <> "quit"
    do
    if input <> "quit" 
    then
        ChordClient.RunCommand input
        Console.Write "\nEnter Command:" 
        input <- Console.ReadLine()
Chord.fs
module Shared

open System
open System.Collections
open System.Runtime.Remoting

type Chord = class
    val hashTable : Hashtable
    inherit MarshalByRefObject 
    new () = {hashTable = new Hashtable()}
    member x.PutValueByKey key value =
        x.hashTable.Add(key, value)
    member x.GetValueByKey key =
        x.hashTable.Item(key)
end                
ChordServer.fs
module ChordServer

open System
open System.Runtime.Remoting
open System.Runtime.Remoting.Channels
open System.Runtime.Remoting.Channels.Tcp

let Initialize port =
    let tcpChannel = new TcpChannel(port)
    Console.WriteLine("Polyphony Server started on port {0}...", port)    
    ChannelServices.RegisterChannel(tcpChannel, false) |> ignore
    let commonType = typeof<Shared.Chord>
    RemotingConfiguration.RegisterWellKnownServiceType(commonType,
        "chord", WellKnownObjectMode.Singleton) |> ignore
ChordClient.fs
module ChordClient

open System
open System.Configuration
open System.Runtime.Remoting
open System.Runtime.Remoting.Channels
open System.Runtime.Remoting.Channels.Tcp

let serverInformation = ConfigurationManager.AppSettings.Item("ClientServer")
let selfServerInformation = ConfigurationManager.AppSettings.Item("SelfServer")

let Initialize() =
    Console.WriteLine("Polyphony Client started and pointing to server {0}...", serverInformation)    
    let localObject = Activator.GetObject(typeof<Shared.Chord>, selfServerInformation) :?> Shared.Chord 
    localObject.PutValueByKey "test" "testValue"

let RunCommand(input:string) : unit =
    
    let inputArguments = input.Split(' ')
    let result = 
        match inputArguments.[0] with
        | "put" -> 
            let localObject = Activator.GetObject(typeof<Shared.Chord>, selfServerInformation) :?> Shared.Chord     
            localObject.PutValueByKey inputArguments.[1] inputArguments.[2] |> ignore
            sprintf "PUT Key:%A Value:%A" inputArguments.[1] inputArguments.[2] :> obj   
        | "get" -> 
            let rec getValue server =
                let chord = Activator.GetObject(typeof<Shared.Chord>, server) :?> Shared.Chord         
                match chord.GetValueByKey inputArguments.[1] with
                | null -> getValue serverInformation    
                | value -> value
            getValue selfServerInformation
        | _ -> "unknown command" :> obj   
    Console.WriteLine(result) |> ignore
App.config
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <add key="ServerPort" value="9877"/>
    <add key="SelfServer" value="tcp://localhost:9877/chord"/>
    <add key="ClientServer" value="tcp://localhost:9876/chord"/>
  </appSettings>
</configuration>