Skip to content

Scatter Gather Pattern

Implementing the Scatter Gather ESB Pattern with Neuron Processes

Overview

This sample presents a simple and reliable design using the Neuron Process Designer and Runtime to resolve a common scatter-gather application integration problem described in the book Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions.

The Scatter Gather pattern is manifested when a requester sends an asynchronous request to a number of providers, which send their replies asynchronously to the requester. A common use for this pattern is when a request for quote is received, and that request must be broadcast out to N number of providers for bid. Each provider will (or may) reply to the request with their respective bid. From there the replies are aggregated and the highest bidder may win.

Another scenario where this pattern is commonly employed is in the context of order processing. In those cases, each order item that is not currently in stock could be supplied by one of multiple external suppliers. However, the suppliers may or may not have the respective item in stock; they may charge a different price and may be able to supply the part by a different date. To fill the order in the best way possible, quotes are requested from all suppliers and then a decision is made as to which one provides the best term for the requested item as shown below.

In these scenarios it would not be uncommon if each service that the message was broadcast to required its own unique translation/transformational or other preprocessing requirements before receiving the message. Business rules may need to be injected and, perhaps even the services to broadcast to may need to be dynamically resolved at runtime. In the figure above, each service may represent an individual vendor.

An example of some business requirements addressed by this pattern’s implementation may be:

  • Contact N airlines simultaneously for price quotes
  • Buy ticket from either airline if price<=$$
  • Buy the cheapest ticket if price >$$
  • Buy the ticket from the first airline to respond

In summary, the Scatter Gather pattern’s goal is to send the same message to multiple recipients which will (or may) reply to it. Wait for all (or some) of the replies and aggregate them into a single response message. With this goal in mind, it can be observed that the Scatter Gather pattern includes the Aggregator (specifically Service Aggregation/Composition) pattern, but may also be composed of several other patterns. For example, when broadcasting the message, a Splitter and Recipient List pattern may be employed.

The Scatter Gather design relies on runtime services, message delivery, correlation, transaction management and web service routing of Neuron ESB.

Note: Service composition, the act of building an application out of multiple services, is usually depicted as a “has a” relationship and the whole is composed of the parts. In contrast, aggregation is a “uses a” type of relationship. The differences are quite subtle but nevertheless important to grasp. In composition relationships, the life cycles of parts are tied to the lifecycle of the whole and when the whole no longer exists, the parts no longer exist either. In aggregation, the parts exist independent of the whole and can go on living after the entity that uses them no longer exists.

This paper (and accompanying sample) establishes how a Neuron Process can be used to broadcast a message, asynchronously, to N number of services (recipients). Within the process, the replies are then aggregated and either published to the bus, or alternatively, they can be returned to the original calling client. The process goes further to illustrate how to dynamically resolve which services to execute at runtime, mapping them to Topics, which are in turn mapped to Neuron Service Connectors. This provides a loosely coupled solution as subscriptions are used to route to the services in the broadcast list. As part of the solution, transformations (XSLT) may be associated with each service, and hence a way to dynamically retrieve a transform from the Neuron ESB Configuration store at runtime and execute it is also demonstrated.

Sample Manifest Description

The accompanying sample is composed of the following 4 components located under the default Neuron installation directory:

  • Neuron ESB Configuration named ScatterGatherProcessSample. This contains all the Neuron specific artifacts used for this solution (i.e. Topics, Parties, Scatter Gather Process, Client and Service Connectors) pre-configured. This configuration can be found in \Samples\Configurations\ScatterGatherProcessSample.
  • Visual Studio solution named Scatter Gather Solution. This solution contains the three following Visual Studio C# projects:
    • ContosoClientRequest – A console application used to make the client side WCF service request call into the Neuron ESB hosted service endpoint (Neuron Client Connector). The code is displayed in Code Fragment 2 in the Designing the Process section of this document.
    • NewMartQuoteService – A WCF service endpoint hosted within a console application. This Web service endpoint represents a Quote Service hosted by a fictional vendor named New Mart.
    • OldMartQuoteService – A WCF service endpoint hosted within a console application. This Web service endpoint represents a Quote Service hosted by a fictional vendor named Old Mart.

The Visual Studio solution can be found in \Samples\Scenarios\ScatterGatherPattern.

At runtime, the sample provides a demonstration of the following scenario, which is dependent upon the implementation of the Scatter Gather pattern described in this document.

Scenario Description

Step 1Contoso Supply House is a fictional, modern, web based company that sells toys, appliances and services to customers over the internet. Contoso hosts their own web site and exposes a public facing web service so external distributors can automate the placement of orders.

Neuron Implementation: The public web service is hosted by Neuron ESB and is configured within the Neuron Client Connector named, ContosoQuoteService, exposed on the url, http://localhost:9001.

Step 2: An external distributor submits a purchase request for red fire engine toy trucks to Contoso through the public facing web service.

Neuron Implementation: The request is submitted by running the Visual Studio Console Application named, ContosoClientRequest.exe.

Step 3: Contoso receives the distributor’s request. However, Contoso doesn’t keep any inventory in stock. Instead they submit all orders to external vendors to get the best price for the items requested. The list of vendors to query is selected based on the items ordered or by the distributor’s preference.

Neuron Implementation: The request is received by the Neuron Client Connector, ContosoQuoteService. This Client Connector is configured to execute the Neuron ESB Business Process named Scatter Gather, which is described in the Designing the Process section of this document.

Step 4: Contoso has an automated process which determines the vendors to query for best price.

Neuron Implementation: The vendors to query are extracted from the custom SOAP header of the original distributor’s purchase request from within the process. NOTE: In most real world scenarios, the list of vendors would probably be resolved from a database lookup (or some other means) within the process.

Step 5: Each vendor hosts their own Quote Service, a web service that Contoso can submit their distributor’s price queries against.

Neuron Implementation: Each vendor’s web service is represented by a Neuron Service Connector. There are two vendors setup, Old Mart and New Mart.

The Old Mart web service endpoint is represented by the following:

  • A Service Connector named OldMartQuoteService.
  • OldMartQuoteService is called directly from the Scatter Gather process via the Service Endpoint process step.

The New Mart web service endpoint is represented by the following:

  • A Service Connector named NewMartQuoteService.
  • NewMartQuoteService is called directly from the Scatter Gather process via the Service Endpoint process step.

Step 6: Contoso’s automated process submits a query to each vendor’s Quote Service, aggregating the results and returning the options to the distributor.

Neuron Implementation: The original distributor’s purchase request is transformed according to each vendor’s Quote Service requirements. Then the vendor’s Quote service is called asynchronously.

  • The responses are combined, optionally transformed and returned to the distributor as a SOAP response message.

Designing the Process

The Scatter Gather process depicted in Figure 2 is built using the following process steps:

  • Code, Split, Transform – Xslt, Service Endpoint, Decision, Cancel

The process depicted begins with a Code process step named “Get List of Services”.

Get List of Services

The “Get List of Services” responsibility is to determine the list of services to be called, as well as any transformation requirements specific for those services. There are a number of common ways in which this information could be resolved, depending if the information is sent along with the original request, or it is retrieved at runtime. Resolution strategies can include and are not limited to the following:

  • Service list information is sent at runtime by client via custom SOAP Headers
  • Service list information is maintained in a static list within Neuron
  • Service list information is obtained by querying a database at runtime
  • Service list information is retrieved through the execution of business rules at runtime
empty Split process
Figure 1: An empty Split process component as displayed in the Neuron Process designer.Figure 2: The completed Scatter Gather process as displayed in the Neuron Process designer.
Note: Several process steps (specifically Code, Publish, Split and Decision) within the Neuron Process Designer support direct C# inline editing. In most cases, users simply right click the process step and select “Edit Code” from the short cut menu to display the Code Editing Window. The Code Editing Window provides full access to .NET 4.8 Framework, supports IntelliSense, displays real time compile errors and supports referencing external .NET assemblies. Compiling processes into .NET assemblies is not necessary. All code is dynamically compiled either the first time the component is executed within a process, or after modification.

For example, a list of services to broadcast to, (and later aggregate the responses of), could look very similar to the xml fragment below. Each service node has 3 attributes, named topic, action and transform:

<NeuronServiceList xmlns='urn:xmlns:neuronesb-com:soapheaders'>
    <services>
        <service serviceEndpoint='OldMartQuoteService'
        action='http://schema.neuron.sample/oldmart/broadcast/IQuote/RequestQuote'
        transform='QuoteRequest_To_OldMartQuote'>
        </service>
        <service serviceEndpoint='NewMartQuoteService'
        action='http://schema.neuron.sample/newmart/broadcast/IBid/RequestBid'
        transform='QuoteRequest_To_NewMartQuote'>
        </service>
    </services>
</NeuronServiceList>

Code Fragment 1: An example XML document representing a list of services to broadcast an incoming message to.

Each service is represented by a “service” node. The serviceEndpoint attribute determines which service connector the Service Endpoint process step should call. The action attribute is used to set the Action Header field on the Neuron ESB Message. The transform attribute represents the name of an XSLT stored in the Neuron ESB configuration store.

In Code Fragment 1, the serviceEndpoint attribute holds the name of which vendor’s Quote service to call. It’s value is used to dynamically configure the Service Endpoint step with the correct value.

The action attribute represents the Action (i.e. Operation/Method) on the service endpoint that needs to be called.

The transform attribute will hold the name of the XSLT stored in the Neuron ESB configuration store. At runtime, the actual XSLT content will be retrieved by using the transform attribute value as the lookup key, passing that key to Neuron’s Transform – Xslt Process Step.

Note: The XML format of the service list, property names and namespaces used in this example are intended only for demonstration purposes. In lieu of a static list, a database lookup or LOB (Line of Business) application query can be used. In fact the most common implementations usually employ some of kind of database lookup or query to retrieve the necessary list of services to call.

For the sake of expediency, this example will use a static list of services defined within the first Code Process Step within the process. WCF (Windows Communication Foundation) can be used to make the initial SOAP request to a generic web service on ramp hosted by Neuron (through configuration of a Neuron Client Connector) as shown in Code Fragment 2:

using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Xml;
namespace Neuron.Esb.Samples
{
    static class Utility
    {
        const string _msg = @"<PurchaseRequest>
                                  <Products>
                                     <Product name='BigBox' quanity='10' location='Denver'></Product>
                                  </Products>
                            </PurchaseRequest>";
        public static void PipelineByWebService()
        {
            XmlReader xmlReader = null;
            Message reqMsg = null;
            
            // Sending a request/response to a client connector
            using (var chan = new ChannelFactory<IRequestChannel>(new WSHttpBinding(), 
                new EndpointAddress("http://localhost:9001")))
            {
                var proxy = chan.CreateChannel();
                using (xmlReader = XmlReader.Create(
                    new System.IO.MemoryStream(System.Text.Encoding.UTF8.GetBytes(_msg))))
                {
                    using (reqMsg = Message.CreateMessage(chan.Endpoint.Binding.MessageVersion, "", xmlReader))                                                                          
                    {
                        Console.WriteLine(proxy.Request(reqMsg).ToString());
                        reqMsg.Close();
                    }
                    xmlReader.Close();
                }
                proxy.Close();
                chan.Close();
            }
        }
    }
}

Code Fragment 2: Sample WCF C# code within Microsoft Visual Studio. This represents code that a client can execute to submit a message to a Neuron-hosted web service (Neuron ESB Client Connector) listening on port 9001.

In Code Fragment 2, the original request is sent to the Client Connector’s endpoint address (http://localhost:9001 ). Once the message is received by Neuron, a Code Process Step is used to load the static list of services depicted in Code Fragment 1 into an Xml Node List object, saving it as a Neuron process variable named, “ServiceConfiguration”. By double-clicking the Code Process Step named “Get List of Services” within the Scatter Gather process, the C# code fragment can be seen in Code Fragment 3:

// *******************************************************************************
// First retrieve the list of services to call.  This could come in via soap headers,
// through an external database call, lookup, or from wherever is appropriate for
// the scenario.
// In Neuron, each service is represented by a topic, making this essentially a list of
// topics (or sub topics) to route to, which in turn results in a service call.
// In the last step, we add the list of services as a property to the existing process
// context so that it can be retrieved in the next process step
// *******************************************************************************
string nameSpace = "urn:xmlns:neuronesb-com:soapheaders";
string prefix = "n";
string serviceList =
   @"<NeuronServiceList xmlns='urn:xmlns:neuronesb-com:soapheaders'>
   <services>
     <service topic='Finance.Vendors.OldMart.QuoteService'  
       action='http://schema.neuron.sample/oldmart/broadcast/IQuote/RequestQuote'
       transform='QuoteRequest_To_OldMartQuote'></service>
     <service topic='Finance.Vendors.NewMart.QuoteService'
       action='http://schema.neuron.sample/newmart/broadcast/IBid/RequestBid'
       transform='QuoteRequest_To_NewMartQuote'></service>
   </services>
   </NeuronServiceList>";

// Load the list of services into an XML Document, add the namespace and retrieve the
// service nodes,
// persisting the node list into the process's context property
System.Xml.XmlDocument xmlDoc = new System.Xml.XmlDocument();
xmlDoc.LoadXml(serviceList);
System.Xml.XmlNamespaceManager nsMgr = new System.Xml.XmlNamespaceManager(xmlDoc.NameTable);
nsMgr.AddNamespace(prefix, nameSpace);
System.Xml.XmlNodeList nodeList = xmlDoc.SelectNodes("//n:service", nsMgr);
context.Properties.Add("ServiceConfigurations",nodeList);
 

Code Fragment 3: The Code Editing Window displays the C# code contents of the “Get List of Services” Code process step in the Scatter Gather process example displayed in Figure 2. Code can be edited by right-clicking on Code process step and selecting the “Edit Code” short cut menu

In the C# code, the Xml list of services is loaded into a System.Xml.XmlDocument object that an XPATH statement is executed against to retrieve a list ofSystem.Xml.XmlNodes. The list of System.Xml.XmlNodes represents the list of services submitted by the client. Following this, is the final line of code:

context.Properties.Add("ServiceConfigurations",nodeList);
Note: The “context” variable represents the Neuron.Pipelines.PipelineContext object passed to the Code process step by the Process Runtime. This provides users access to the process context, as well as to the actual ESB Message object (accessed through the Data property).

This last line of code persists the list of services, (and System.Xml.XmlNodeList object), as a property of the current process instance (context) so that it may be retrieved later, within other steps in the process. Specifically, the list is saved as a property of the current process context before runtime control is passed to the Split process step named “Broadcast and Aggregate”.

Broadcast and Aggregate

Code Split

The second process step in the Scatter Gather process is the Code Split portion of the “Broadcast and Aggregate” Split process step. In this process step the message is cloned into a collection of identical messages, with each message marked to be routed to a specific service contained in the list retrieved from the custom SOAP header.

Here is where it’s necessary to understand some of the fundamentals of Neuron Processes and the Split process step. Generally, the Split process step allows users to very easily de-batch an incoming message (either by using a simple XPATH expression in a property grid, or by using the Code Editing Window) into their individual parts, submitting these individual parts as a collection of messages to the next stage of the Split process step, called the Execution Block (labeled “Steps”). This Execution Block can contain N number of other process steps. An instance of the Execution Block will be executed for each message in the collection of messages submitted to it. Additionally, the Execution Block can be set to run either synchronously (process one message at a time) or asynchronously. In asynchronously mode, the .NET thread pool is utilized to submit each message in the batch to its own Execution Block instance.

The Code Split portion of the Split process step submits a List of Neuron.Pipelines.PipelineContext objects, each one containing a message in the batch, to the Execution Block. Since the Split process step “split type” property is set to Code, (rather than XPATH), this collection must be created manually by using the following C# code fragment in the Code Split portion of the Split process step:

List<PipelineContext<ESBMessage>> contexts = new List<PipelineContext<ESBMessage>>();
 

After the collection is created, the list of services previously persisted in the “Get List of Services” Code process step must be retrieved as follows:

System.Xml.XmlNodeList nodeList = (System.Xml.XmlNodeList)context.Properties["ServiceConfigurations"];

Once the nodeList object is retrieved, loop through all the nodes performing the following steps:

  1. Create a new Neuron ESB Message (Neuron.Esb.ESBMessage object), by cloning from the original request message using “context.Data.Clone()”
  2. If transform is required, set the Neuron Transform – Xslt Process Step’s dynamic message property using “SetProperty()”. This will enable the Transform – Xslt to dynamically look up the Xslt at runtime from Neuron’s in memory Xslt configuration store.
  3. Set the Action header of the Neuron ESB Message
  4. Set the Service header of the Neuron ESB Message (used by the Service Endpoint step)
  5. Create a new Neuron.Pipelines.PipelineContext object and add the ESB Message to it
  6. Add the new Neuron.Pipelines.PipelineContext + ESB Message to the List of Neuron.Pipelines.PipelineContext objects
Note: The Neuron.Esb.Internal.PipelineRuntimeHelper.ClientContext.Configuration object provides full access to all elements of the Neuron ESB configuration store. This store is loaded into memory by the Neuron ESB Service at startup. This same store is edited by using the Neuron ESB Explorer user interface.

Lastly, the Neuron.Pipelines.PipelineContext list is passed to the Execution Block:

return contexts;

By right clicking the Code process step named “Code Split” within the Split process step, the C# code fragment can be viewed as in Code Fragment 4:

//Create a list of process contexts to be filled and returned to the aggregator
List<PipelineContext<ESBMessage>> contexts = new List<PipelineContext<ESBMessage>>();
 
// Retrieve the list of services previously saved to the process context property bag,
System.Xml.XmlNodeList nodeList = (System.Xml.XmlNodeList)context.Properties["ServiceConfigurations"];
 
// Loop through the list of services, we'll create a new message from the original, adding the
// topic to the message's property bag so it can later be retrieved in the Steps block.
foreach(System.Xml.XmlNode node in nodeList)
{        
    Neuron.Esb.ESBMessage msg = context.Data.Clone(false);
    msg.Header.Action = node.SelectSingleNode("@action").InnerText; 
    msg.Header.Service = node.SelectSingleNode("@serviceEndpoint").InnerText;
         
    // if there is a translation required, retrieve the xslt and set the property
    if(!System.String.IsNullOrEmpty(node.SelectSingleNode("@transform").InnerText))
    {
        msg.SetProperty("neuron","xsltName", node.SelectSingleNode("@transform").InnerText);
    }
     
    //create new context for the message
    PipelineContext<ESBMessage> splitContext = new PipelineContext<ESBMessage> 
        (context.Runtime, context.Pipeline, context.Instance, msg);
    splitContext.Properties.Add("__ClientContext", Neuron.Esb.Internal.PipelineRuntimeHelper.ClientContext);
     
    // add the context to the result
    contexts.Add(splitContext);
}
 
// Return the batch of messages to be processed by the STEP block
return contexts;

Code Fragment 4: The Code Editing Window displays the C# code contents of the “Code Split” Code process step in the Split process step within the Scatter Gather process example displayed in Figure 2. Code can be edited by right clicking on the Code Split node and selecting the “Edit Code” short cut menu.

Note: SetProperty() and GetProperty() are methods of the Neuron.Esb.ESBMessage object. SetProperty() can be used to add or modify existing Neuron specific message properties. It can also be used to add custom or application specific properties (no schema required). A custom property can be added by passing in a prefix, property name and value. GetProperty() can later be used to retrieve the value of the property by passing in the prefix and property name.

Steps Execution Block

In the previous step, a new set of cloned messages was created from the original request message, some custom properties were added, a new Neuron.Pipelines.PipelineContext was created for each one, and then all were added to a list of Neuron.Pipelines.PipelineContext objects which were returned to the process runtime. Following this, the process runtime will either do one of two things, depending on the value set for the Synchronous property of the Split process step:

  1. If the Synchronous property is set to true, then the process runtime will loop through the set of new messages, executing all the process steps within the Execution Block. Each message will be processed, one at a time, completing all the steps in the Execution Block, before the next message can be processed. The processing of all messages is handled on a single thread. For the experienced developer, this is synonymous to a “Foreach” loop construct, whereas all the process steps within the Execution Block are essentially within a Foreach code block.
  2. If the Synchronous property is set to false, then the process runtime will use the .NET Thread Pool to pass each message, on its own thread, to an instance of the Execution Block. This is essentially synonymous to the parallel branch execution offered in most workflow designers.
Note: When the Synchronous property of the Split process step is set to False, you can set the maximum number of threads to use when processing the split message collection.

In the Scatter Gather process, the Synchronous property for the Split process step is set to false.

Once a message is delivered to the Execution Block (labeled “Steps”) several things happen, first and foremost a check is made to determine if the original request message must be transformed to a format required by the target service.

Requires Transform

The first process step in the Execution Block is a Decision process step, labeled “Requires Transform?”. This has two branches labeled “Yes” and “No”. The “Yes” branch is configured to return either true or false by evaluating for following inline C# code fragment:

return context.Data.GetProperty("neuron","xsltName","").Length > 1;

This code fragment retrieves the value of the Transform – Xslt Process Step’s “xsltName” property (where “neuron” is defined as the prefix) which was previously set on the ESB Message (represented by context.Data) in the Code Split portion of the Split process step. The Transform – Xslt Process Step optionally will use this property (if set) to dynamically load (by name) the Xslt stored in the Neuron configuration store and execute it. If this line of code evaluates to “true”, then all the process steps placed under the “Yes” labeled branch are executed for the ESB Message. If the line of code evaluates to “false”, then the “No” labeled branch will be executed. In the Scatter Gather process, the “No” branch does not contain any process steps

Execute Transform

If a transformation is required, control is passed to the Transform – Xslt Process Step labeled “Dynamic Transform” located under the “Yes” branch which will translate the original incoming request message (an ESB Message represented by the context.Data variable). In our example, XSLT is chosen to translate the message, the name of which was previously retrieved in the Code Split portion of Split process step.

Note: Although the method used in this example to translate the original client request message to the message format required by the service endpoint is XSLT, any method including but not limited to custom code, database query, and LOB query or business rules could be used.

Call Service

Once the transformation logic is completed, the Neuron ESB Message is passed to the Service Endpoint process step, labeled “Dynamic Service Endpoint”. The underlying purpose of this process step is to call the service endpoint, represented by a Neuron Service Connector. Like the Dynamic Transform step previously mentioned, the Dynamic Service Endpoint step is also configured by Code Split portion of the Split process step, when the msg.Header.Service property is set to the value of the serviceEndpoint attribute from the service configuration XML created at the beginning of the process.

Either a response is expected from the service called by the service endpoint step or a timeout will occur. Once the response message is received by the executing process instance, the process runtime will use the contents of the response message to replace the body of the current ESB Message being processed. This newly modified ESB Message will be used in all subsequent process steps.

Join

Once ALL the messages are processed by their respective instance of the Execution Block and its associated process steps, control is passed to the Join portion of the “Broadcast and Aggregate” Split process step. Within the Join, the process runtime can either reassemble all the individual Neuron ESB messages which were processed by the Execution Block back into a list of Neuron.Pipelines.PipelineContext objects, or discard them. How the Join functions is determined by the value of the “join type” property. In the Scatter Gather process, the join type is set to Wrapper, which allows configuration of the Join by setting two properties in property grid, WrapperElementName and WrapperElementNamespace. In our example the following property values were used:

WrapperElementName = QuoteCollection

WrapperElementNamespace = http://schema.neuron.sample/broadcast/result

This results in all the Neuron ESB Message bodies contained in the list of Neuron.Pipelines.PipelineContext objects to be combined and encapsulated by an XML root node of “QuoteCollection” with the default namespace of “http://schema.neuron.sample/broadcast/result”. An example of a result that the Join would produce from the original message request follows:

<QuoteCollection xmlns="http://schema.neuron.sample/broadcast/result">
       <QuoteResult>Response received from service endpoint 1</QuoteResult>
       <QuoteResult>Response received from service endpoint 2</QuoteResult>
</QuoteCollection>

Code Fragment 5: An XML document representing the aggregated result message returned from the list of services that were executed. Each service response is represented by a <QuoteResult> node. However, a service can return ANY XML data. Whatever data is returned is inserted as a child node under the QuoteCollection wrapper root element.

In Code Fragment 5, each <QuoteResult> node represents the returned response message from a service called through the “Call Service” Publish process step. In our example, if further modification to the aggregated result message is needed, the Split process step could be followed by another process step, perhaps either a Transform – Xslt, Code or Rules-WF process step.

However, more granular control over the collection of Neuron ESB Messages returned from the Execution Block can be obtained by changing the join type property from Wrapper to Code. This results in access to the Code Editor for direct modification of the collection. An example of C# code used within a Join is shown in Code Fragment 6:

using (System.IO.StringWriter sw = new System.IO.StringWriter())
{
    using(XmlTextWriter xw = new XmlTextWriter(sw))
    {
          //writes <QuoteCollection> root element
          xw.WriteStartElement("QuoteCollection");
 
        foreach(PipelineContext<Neuron.Esb.ESBMessage> c in splits)
        {
             //adds <QuoteResult>...</QuoteResult>
            xw.WriteRaw(c.Data.ToXml());
        }
 
          //writes </QuoteCollection>
        xw.WriteEndElement();
        xw.Flush();
    }
    //Replace original request message with new aggregated response message.
    Neuron.Esb.ESBMessage esbMsg = new Neuron.Esb.ESBMessage();
    esbMsg.FromXml(sw.ToString());
    context.Data = esbMsg;
}

Code Fragment 6: The code sample demonstrates how to aggregate the responses returned from all the service calls using an XmlTextWriter. Each service call response is represented by a PipelineContext object (c) within the splits collection. This is exactly what happens when using the Wrapper join type.

When using the Code join type, the original Neuron.Pipelines.PipelineContext (which contains the original unaltered client response Neuron.Esb.ESBMessage) as well the list of Neuron.Pipelines.PipelineContext objects processed by the Execution Block, are passed into the Code Editor as arguments represented by the variables “context” and “splits”. As shown in Code Fragment 5 above, entirely custom xml can be created to aggregate the response messages collected from the Execution Block by iterating through the splits collection using a foreach construct.

Lastly, to return the newly created aggregated XML response message to the remaining steps of the process, the Neuron.Esb.ESBMessage assigned to the current context (represented by the context.Data variable) must be replaced with a new Neuron.Esb.ESBMessage containing the aggregated XML.

Return Result

The final piece of the process is a Cancel process step named “Return Result”. This is where a decision will be made regarding what to do with the aggregated response message returned from the Join. In the Scatter Gather process example, a Cancel process step is used because the response message is being returned to the calling client. Using a Cancel process step essentially stops the response message from continuing beyond the process instance and being published to the bus. Instead, the response message is returned to the original calling client. This will happen AS LONG AS THE ORIGINAL REQUEST IS A REQUEST/RESPONSE TYPE OF REQUEST. This is configured by either manually setting the Semantic property of the original Neuron.Esb.Message to “Request” or automatically, by ensuring that the Neuron Client Connector’s Messaging Pattern property located on the Binding tab is set to “Request-Reply” rather than “Datagram”.

However if required, the response message can be forwarded to the original Topic, or another Topic by either removing this step, or replacing it with a Publish process step. In this latter case, the Neuron Client Connector’s Messaging Pattern should be set to Datagram, and the associated code to send the client request to Neuron (shown in Code Fragment 2) should be altered appropriately using the IOutputChannel (or similar interface) so as to avoid client timeout errors.

Configuring the Solution

The accompanying Neuron ESB Configuration named, ScatterGatherPipelineSample, is configured to support the Scatter Gather solution described in this paper. The configuration file can be opened within the Neuron ESB Explorer. Within it are the following elements:

Neuron Service Endpoints

Neuron Service Endpoints are used to either host service endpoints, (essentially using standardized web service protocols to expose a Topic publishing service to receive messages) or, to communicate to existing service endpoint urls, (Neuron sends a message from the bus to the specific service endpoint url). The former is regarded as a Neuron Client Connectors, whereas the latter is a Neuron Service Connector. Both are a type of Neuron Service Endpoint.

In the scenario outlined in this paper, the distributor submits a purchase request to the service endpoint hosted by Contoso.

To support this, the following Neuron Client Connector (representing the Contoso endpoint) is configured under the Connections:Endpoints:Service Endpoints section of the Neuron ESB Explorer:

General tab:          Name:               ContosoQuoteService
General tab:          Binding:            WSHttp
Security tab:         Security:           Message:Windows
Client Connector tab: URL:                http://localhost:9001
Client Connector tab: Processing Mode:    Business Process
Client Connector tab: Processes:          Scatter Gather
Client Connector tab: Service Attributes: Enable SOAP headers should be checked

The following Neuron Service Connectors, representing the existing Quote Services for each respective vendor, are configured under the Connections:Endpoints:Service Endpoints section of the Neuron ESB Explorer:

General tab:           Name:      NewMartQuoteService
General tab:           Binding:   BasicHttp
Service Connector tab: URL:       http://localhost:8732/
Service Connector tab: Subscribe: Should be unchecked
General tab:           Name:      OldMartQuoteService
General tab:           Binding:   BasicHttp
Service Connector tab: URL:       http://localhost:8731/
Service Connector tab: Subscribe: Should be unchecked

The service connectors are not enabled because they are being called directly from a Service Endpoint process step. If they were enabled, they would need to associated with a subscriber and would consume extra resources on the Neuron ESB server.

Neuron Data Store

XML documents, XSD Schemas and XSL Transformation documents can be persisted and referenced from the Neuron Data Store configured under the Data section of the Neuron ESB Explorer. XSD Schemas and XSL Transformation documents can be referenced directly within the Validate – Schema and Transform – XSLT process steps. Additionally, the Neuron Data Store can be accessed at runtime through theNeuron.Esb.Internal.PipelineRuntimeHelper.ClientContext.Configuration object to dynamically retrieve stored documents.

In this sample, the following XML Documents are stored only for reference. They are not retrieved at runtime.

Name:            ClientPurchaseRequest
Description:     Sample purchase request submitted by distributors to the Contoso web service
Name:            NewMartQuoteRequest
Description:     Sample Quote request message expected by the New Mart Quote Service
Name:            OldMartQuoteRequest
Description:     Sample Quote request message expected by the Old Mart Quote Service
Name:            QuoteResult
Description:     Sample Quote result message returned by both Old Mart and New Mart Quote Services

The following XSL Transformation documents are retrieved at runtime to transform the incoming purchase request to the expected format of the vendor’s Quote Service.

Name: QuoteRequest_To_NewMartQuote
Name: QuoteRequest_To_OldMartQuote

Running the Sample

Open the Sample

To open this sample, see the topic Using the Neuron Samples and select the Scatter Gather sample.

Setup the Quote Services

  1. In the Visual Studio project that opens, configure the solution to start all three projects and then build all the projects in the solution.

Run the Sample

  1. Use Visual Studio to start up all projects (press F5).
  2. Once all programs have connected to Neuron ESB, press Enter on the ContosoClientRequest to initiate communication. This will submit the request to Neuron.

Results

Once a distributor submits a purchase request to the Neuron ESB hosted service endpoint (Neuron Client Connector), the Scatter Gather process is executed. The process resolves the vendor service endpoints, transforms the original purchase request as necessary, sending it to individual Quote Services of New Mart and Old Mart

  1. A transformed purchase request will be written to the Console Application window hosting the NewMartQuoteService as in the figure below:
  1. A transformed purchase request will be written to the Console Application window hosting the OldMartQuoteService as in the figure below:
  1. Each vendor’s Quote Service will return a result. These results are aggregated within the process and returned to the distributor making the purchase request. This is represented by the result written to the Console Application window as in the figure below:
  1. The returned results from both Quote Services are enclosed within the QuoteCollection element and written to the Console Application window that made the original purchase request. This also includes the full WSHttp envelope.

Process Designer

All processes with the exception of the Code process step are configured by selecting and setting their properties in the property grid located at the bottom right of the process designer. The Code process step is configured by selecting the “Edit Code” option from the short cut menu that is available when right-clicking the Code step in the process designer. See the process documentation for more information.

Figure 3: The Neuron Process Designer displaying the Scatter Gather process. Property Grid at the bottom right displaying the properties of Split process step named “Broadcast and Aggregate”. Note the Synchronous property is set to False, allowing a broadcast into the Execution Block (labeled “Steps”) for processing rather than each message processed one at time”.
Was this article helpful?
Dislike 0
Next: Failed Message Routing