Monday 19 October 2020

Distributed state management — refresher

  • Ease of Stateless Services

    Designing stateless distributed systems are relatively easy. You would have raised an event/message once the service had done processing. You typically are not worried about how the other systems consume your data. In fact in the majority of these scenarios, you don’t care what happens to the message/event after you were done.

    Think of fire and forget. This could be easily achieved using typical message broker queues & topics. Sadly, this stateless approach is quite bookish and not practical for most enterprise application needs.

    Pains of Stateful Services

    State could be as straightforward as an entity-update which another service is dependent on or a bit more trickier one as an amount being debited from one account and credited to another. This information/state needs to be maintained somewhere.

    In 2 / 3 tier applications, we had a central transaction/state coordinator/server through which all transactions must flow. In case of errors, it was the task of this coordinator to rollback all the child tasks relevant in the transaction context.

    One of the problems with this approach was about a single point of failure (SPoF) — what if this server went down — what happens to the workflow/process state ?

    In distributed architectures, we would require a distributed state log which is virtually “centralized”, but practically distributed across nodes/pods/VMs for enabling both availability and scalability.

    One of the main challenge raised in this distributed situation is state consistency :

    • How do we make the state synced across the nodes such that all nodes return the same state even if queried separately? — strong consistency.
    • Staleness/freshness — Is the state returned older than the state at other nodes ?
    • During a retry operation (say when part of the workflow failed), what is the impact of executing the service again ? Processing duplicate messages should not affect the underlying entity ever — think of 2 debit messages against the same account during a retry. Is the old state overwritten with new or should it be ignored ?

    Approaches

    There are many approaches employed these days for state management :

    Coordinators/Orchestrators : a set of system(s) that manages the state — think of an orchestrator in a music symphony.

    • Distributed locks : What if nodes in a cluster can elect a leader among themselves such that leader is the only one that can change the state. Paxos/Chubby/Raft being some of the prominent algorithms with many implementations.
    • 2-Phase commit : Employed mostly in systems migrating from 2 / 3 tier to the cloud; requires two cycles of requests across ALL participants — First cycle preparing the for commit — PrepareForCommmit and second the commit itself — CommitNow.
    • Eventual Consistency : state as received is processed as long it’s more newer than the one it is aware. All nodes are NOT required to be aware of the most recent state at any point in time. Once all of the applicable process completes, the eventual state is available in the DB/Cache. The UI depicts information available to it, though stale. For a specific workflow, UI would depict states as ‘InProgress’ and later as ‘Completed’ once all the tasks in the workflow are identified as completed or after a timeout. This approach is applicable for information that is not too business critical, where clients are OK with information that is stale.
    • Optimistic/TimeStamp based : If the timestamp of the state received is newer than the one available in the data store, the node applies it as a new state. As in most cases, locks might need to be applied on the DB record to make sure no one else is simultaneously applying.
    • Event Source Logs — EventSourcing depends on an append only store for all events. This distributed event-log could in fact be used to derive the state of an entity at a point in time by evaluating the events until a point in time. Snapshots of the state at a point in time can definitely speed up this evaluation of state.

    Depending on the consistency requirement, it’s usually a mix of the above approaches used for distributed state management.

    Technology Options

    Writing frameworks/libraries from scratch that address above challenges are complex and not recommended unless you are a software company with serious software research focus.

    Across the spectrum, there are interesting frameworks and stacks that can assist the ‘common engineer’ (derived from ‘common man’). As each framework has their (dis)advantages for adoption, based on the technical capability of the team, hybrid solutions too can be looked into.

    • Azure Durable Functions based orchestration : OrchestrationTrigger, when applied on services, exploits the capabilities of Durable Azure Functions that automatically orchestrate the state. The fundamental idea being, the context/state is available for all of the functions(services) participating in the workflow. For developers, this is like writing a 2 tier application with a single try-catch block to handle any error/state across libraries. Instead of libraries, you are calling services with the entire context of execution available to you across the services. From the AWS world, we now have the Step Functions which behaves the same.
    • Reliable Actors, Reliable Collection built over Service Fabric/Azure Service Fabric (SF) support orchestration for getting/updating state information across the distributed nodes while hiding away the intricacies on how nodes internally communicate and keep themselves in sync. This is a pretty good option if you have no plans to support multi cloud. Though it’s conceptually possible to host SF onto AWS, no assurance on the level of compatibility today. Possibly satisfying CP in CAP.
    • Orleans research project from Microsoft needs a special reference here as both SF and Orleans are based on the actor model though the design is different.
    • Akka and related Akka.Net : An actor model where a conductor/parent actor is internally aware of its child actors within a cluster. The model can be exploited for various distributed state management together with its supported persistent actor and singleton model. Compared with the Service Fabric model, SF does not support this parent-child relationship. (PS: if you are from a .NET background, do check Akka.net. Further ahead, together with Azure, Akka.net deployed on AKS pods is a cool experiment for massively scalable state management needs). CAP is deterministic here based on the storage/persistence store selected.
    • Dapr.io : Supporting strong consistency (all nodes must be in sync) for the state management, Dapr is deployed typically as a sidecar and does not disturb the service code (unlike Akka.net where runtime is part of the service). (do check out similar service mesh offerings like Istio too)
    • Kafka Streams , HazelCast Jet : acting as orchestrators, these stream processing engines have intrinsic support to make sure a message in the cluster is processed ‘exactly-once’. This out of the box feature can be exploited to manage state as you don’t need to worry how the set of nodes are talking with each other to reach an agreement internally. Intricacies on how the nodes in the cluster message each other over queues to reach an agreement is completely abstracted away. Possibly satisfying CP in CAP.
    • Axon, Eventuate.io, Camunda , Netflix Conductor : similar to the way above ServiceFabric/Akka/Kafka streams function, these too hide away the internals of inter node state synchronization and could be looked into.
    • NServicebus : the supporting framework requires an explicit transaction start call such that it can handle rest of the related messages in the transaction. Internally it can lock all related messages for a forced sequencing like a funnel in case there are too many consumers. Possibly satisfying CP in CAP.
    • Redis Locks : Use Redis for achieving a distributed lock before changing state.

    It’s highly recommended to check with your architect team who could weigh-in the features while considering the characteristics/NFR’s and KPI’s of your system. The underlying storage/persistence later for each of the above set of frameworks/services directly effect the CAP. In cases the framework allows for choosing a persistence store, must review whether its CP or AP of the CAP that you are planning to satisfy for your service.

    References

Tuesday 13 October 2020

M&A and TOGAF

During an interesting discussion online on Mergers & Acquisitions, a basic question arose — if we consolidate technologies & tools used across the two merging companies, would it suffice most of the Architecture needs for the new company ?

Maybe; but in most cases, No.

A more formal approach is required to make sure we do not end up with a half cooked chowder served in a platinum goblet. We need means to formalize a recipe that takes care of most the business & stakeholder concerns while making sure we have added essential quantities of innovation and budget to the recipe.

What if we could exploit learning from TOGAF and its 4 domain pillars (BDAT) as the base line?

As the first essential requirement, management, stakeholders and technology leaders must define and agree upon an Architecture vision. The vision must represent the desired state of Architecture that cuts across the BDAT (Business, Data, Application and Technology) pillars. Furthermore, vision must act as the means to communicate with other partner leaders on where the new company is headed in the next 3–5 years.

Think of vision as a simple but appealing menu at the Michelin starred restaurant — — just enough to interest the diner. For typical small to medium enterprises going through a merge, think of at least 2–3 months to define this, as this would essentially become the guiding star for the rest of the architecture detailing exercise in the coming days.

Once we have defined the vision, it’s critical to have the current state of architectures ranked against the vision (think of maturity models). This could be as simple as 1,2,3,4,5 with the vision ranked at 5, while current architectures at rank 1; especially if we just adopt everything from both companies as-is before going through the below exercise.

Each cycle of the TOGAF ADM in the coming months should help us get to rank 5 as we reassess our rank periodically — every quarter/year. This is similar to Michelin Star 1 going until 3.

Following the TOGAF ADM is quite perfect for our need while detailing each of the BDAT pillars.

Business (Common processes which are procurement, operations …), Data (kind, tools, policies …), Application (Toolsets, policies) , Technology (Service Registry, SOA, micro services, neural networks…) pillars require many viewpoints to be created as required.

In addition, typical cross cutting viewpoints like Devops, Infra, HR, too must be assessed and detailed during the ADM.

Carrying ahead with the BDAT definition, ADM does provide means to define the governance model (who, how, what) and when/who can change/refine the governance model itself.

Now is the perfect opportunity to define the road map for the next couple of years for the merged company that also helps better the targeted architecture rank.

As we observe, ADM does force us into absorbing a formal mechanism to identify the perfect recipe for our new architecture. ADM compels us to look into opportunities (even across innovation programs active in the two companies) that could pop up during the merger that could further lead to defining new business processes/tools/use cases/products too.

Once the first cycle of the ADM is complete, we could have reference enterprise architectures that partner businesses can consider. All documentation including the reference models, process changes, view points, governance models, recipes�, principles could now be captured in the TOGAF enterprise continuum.

Soup is now being served. This time it was well cooked and served in a proper china soup bowl.

EventChain

Applying Blockchain to Event Sourcing

Event Sourcing pattern at the core requires an event store to maintain the events. What if we add these events as it arrives into a blockchain ? This should effectively make sure the events have not been tampered with. The plan would be to initiate typical blockchain mining after which the event is added to the “block-chain of events” — an “EventChain”.

The definite side effect is that until the mining is complete, the business transaction cannot be internally marked as complete. Considering the time typically taken for mining, this would probably be an offline job.

Tamper Proof

The typical challenges faced by organizations who employ event sourcing and the event store is about securing the events. What if the DB admin for the event store manages to inject/remove events ? The replayed events and resulting projections are no longer valid in this case. Event chains should solve this issue for typical event stores.

Exploit the distributed infrastructure.

For private event chains , where businesses do not want the chain nor events to be exposed, existing distributed systems/hosts can be exploited for mining. Your event store DB cluster hosts, event sourcing services hosts, API hosts, cache cluster hosts and others that are spread across geography could be exploited for the same.

GDPR Challenges

There are cases where regulations require personal data to be removed from all data stores. In our case, this is about removing the related set of events from the event chain. Without the event chain, removing events from the event store was quick and easy.

Resetting the event chain when events are required to be deleted is challenging especially if there have been many events after the event(s) in concern. This would require re-mining the rest of events after removing the event(s) that had personal data all the way down to the most recent event. As this is an extremely time and compute intensive operation, it’s not recommended to store events that contain personal data in the event chain.

Snapshots

As the events from the event store can be played back to recreate a state at a point in time (“projections”), we could in fact have “snapshots” to identify a specific projection in time. We could link this snapshot as a child branch to the main event chain tree such that it’s not required to recalculate the projections each time; while making sure the projections themselves have not been tampered with.

We could look at having many child branches/trees for the different filters/conditions too.

Monday 12 October 2020

Kafka Streams has an edge over Service Fabric ?

Compared against the .NET/Azure offerings, the level of abstraction enabled by Kafka Streams for event processing while exploiting underlying Kafka message-topic-queue patterns is pretty neat. 
 
Did come across an interesting framework that used C# libraries over Kafka Streams by @tonysneed in GitHub too here : https://wp.me/pWU98-1v2
 
Hope Service Fabric Mesh Reliable Actor or similar offerings from Azure catches up with Kafka Streams in terms of seamless integration for distributed event processing.
 
For a start, assuring messages are processed 'exactly-once' is a basic requirement for most distributed systems. Yet to come across native frameworks in the .NET world that use Azure/Akka.NET streams/Service Fabric Mesh or the likes that enable essential distributed capabilities like 'exactly-once' and others with minimal developer effort :!

#azure #kafka #confluent #kafkastreams #eventsourcing #akka #distributedcomputing #cloudarchitecture

Saturday 29 June 2019

Software Engineering lost in the cloud?

It would seem the cloud is making you a lazy software engineer. Engineers these days are now have a ready answer for most of the architectural and design concerns - "its taken care at the cloud". This perception is scary and appears to makes any tom-dick-harry engineer with minimal to zero computer/software knowledge "become" "master" software-engineer overnight. This halo is bothering. Whatever happened to clean code / patterns essential to designing your software during the days of distributed computing setup in local clusters ? Perhaps none today cares about minimizing traffic across nodes and syncing time across nodes nor time sharing and optimizing resources during your minimal time at the node. Not sure the solution for this until you are choked to become yet another Harry. hashtag

Saturday 12 January 2019

ServerFULL deployments

Moving away from Typical service deployments

Rather than have services typically tied to a set of machines and load balanced as-is today in the SOA/SaaS/Microservices world, what if we could just throw a set of servers and get them be assigned/allocated dynamically and more specifically, attain tight packing of services on the same hardware ?

Though mostly exploited on the Cloud with AWS Lamdas and Azure Functions, Serverless as a pattern are awesome for OnPremise deployments too. An interesting set of options for ServerLess OnPremise is available at this list. Though its quite a misnomer in cases of OnPremise deployments where we really need to bother about extreme and efficient hardware utilization of the server, it is preferable to call this approach as ServerFULL as the desired effect is to be fill up the server to the FULL ;)

Once the Docker Images/perhaps later Memory Images are available in shared In-memory/SSD drives, any of the machines/VM could be dynamically chosen for deploying the service and finally un-deployed down once done, allowing the space for the next.

OpenFaas/OpenWhisk seem to be on top of the list with both exploiting Docker containers. Though there is still constraints on elasticity (bringing up new VMs that finally run the Containers is time consuming, while adding more physical machine could take days), it is still an exciting means to efficiently exploit what is available on-premise in the moment.

Just like in Serverless world on the cloud, Services that consume high resources (CPU/RAM) for long duration and the ones that comparatively take higher time to spawn, might not be a candidate for being in the ServerFULL environment as these tend to block up the VMs/containers for long.

Think of designing typical business workflows with events, triggers, logic, nested flows and actions that span in/out, with these getting mapped into services by developers and further mapped to the ServerFULL world of machines dynamically - quite exciting times.

References: 


  1. https://martinfowler.com/articles/serverless.html
  2. https://winderresearch.com/a-comparison-of-serverless-frameworks-for-kubernetes-openfaas-openwhisk-fission-kubeless-and-more/


Wednesday 11 July 2018

Structural Imbalance - In Software Systems


We come across many instances in the industry where "code-lumps" get deployed as software services/products with a beautiful UI included to cover up all the ugliness underneath. The design document too look fancy with usages of software patterns neatly listed. After all this stunt, these modules end up with a short life-span and before long, there are in-numerous critical issues being raised. 
In majority of these cases, product owners were forced into releasing these "code-lumps" that just weren't ready, while in other cases the anointed "architect" had no clues to why the "code-lumps" exist and why the pattern was used. At the first look, the software does appear to function as desired with all the components "working" great in the demos.

How could these be avoided in the first place ?

Just like in typical broken buildings we see across the road, structural imbalance refers to modules that doesn't making sense together. Individually, these chosen components / patterns appear perfect for the problem at hand but they just don't sync enough; structurally.
Right from a birds-eye/logical view to the drilled-down/code view, its critical that a dedicated team of architects reach consensus on the many choices being made every day by engineers.
Only if the team of architects had identified the applicable Non-Functional-Requirements (NFR) and defined them initially. Architects and the team of software designers could drill down into one or modules for a detailed design before coding. 
Though check-ins could be allowed from all engineers, none of it should reach the release pipeline until all the "code-lumps" were removed. Architects & designers must agree that the code comply towards the agreed NFR before promoting the code up its life-cycle.
Working closely with the architects, the product owner would now be more confident in communicating with the stake holders.
Do look forward related article on "Why all software engineers must NOT automatically become an 'Architect' " ; which in addition to looking at skill & interest, also touches upon the essential philosophical outlook required by any upcoming architect.

Friday 9 August 2013

Self-optimization in Distributed caches.

Self-optimization in Distributed caches.


Distributed caches are systems where the cache data/objects are stored across distributed nodes/machine. When a data is stored/retrieved by the consuming application, one or more of systems in the distributed system serve the request. This paper attempts to identify self-optimization techniques that could be applied to this distributed cache. For a base implementation of the distributed cache, the open source project HoC (herd of cache @ http://hoc.codeplex.com) is referred. This project implements the distributed cache in .NET using the concepts of consistent hashing.
Self-Optimization in distributed computing refers to the capability of distributed systems to optimize independent of any intervention - machine/human. In a typical de-centralized and cooperative system like HoC, this means the nodes in the distributed cache can make decisions either independent or together. The latter would require the use of various consensus algorithms to be applied by this distributed cache.

Self-Optimization: Candidate Use Cases:
1.) Optimization of node load: decision made by internally by hosting nodes
In a typical consistent hash implementation, there is possibility that the number of objects stored in the cache of some of the nodes are high compared to the other neighboring nodes. This requires some of the data to be moved to the neighboring nodes. This would first include a node first asking the neighboring node for its load. If it detects that the total count of self is considerably higher, it would apply a partitioning of the objects stored and move the selected objects.
Locating an item in the cache would require multiple hops to reach the target node where the data is stored. Whenever a node gets a request for an item that has been moved to a neighboring node, it would require the call to be routed to the neighboring node. Each node is expected to maintain a list of objects that were moved and the target neighboring node to which the object was moved.
During each fetch, the path/nodes traversed to reach the target node could be returned back to the caller such that the next call to the same object directly calls the target server while avoiding the intermediary traverse across nodes.
The end result of this approach would be a more balanced store of objects across nodes.

2.) Self-Optimizing Consistent Hash Algorithm for load balancing
In a consistent hash implementation, similar to a hash bucket, the target node is selected based on the hash key returned by the underlying hashing algorithm. A typical problem would be that the data could get collected more at a specific server.  An alternative approach to solution 1 indicated above would be to apply machine learning approaches such that the change/adjustment -> fn(load distribution) required to adjust the hash algorithm can be identified. In this case, it should be noted that the fn(load distribution) required to normalize the overall load is specific to each system. A pattern could be detected for a specific system/installation and the load pattern for this system could be derived.
Applying this change to the underlying hash key algorithm would require a possible reset of the distributed system. Once reset, the adjustment learned/deduced by the system => fn(load distribution) would need to be applied each time a new object requires to be saved/retrieved. This adjustment function itself could be tweaked further down the time automatically by the system such that a new adjustment function is derived for the next run.
To monitor the overall usage pattern / load across nodes, it would be required to have a data store where the node v/s storage vs. load factor could be stored. Each data stored into the cache system would require its statistics to be stored into this data store. The next reset would require fn(load distribution) to be derived and applied to the underlying hash algorithm such that the load is more spread out in the next run.
This optimization technique assumes that the kind of data including its type, format, locale etc. does not vary considerably across resets.
3.) Optimized resource utilization on nodes
The CPU, RAM and other resources of each nodes would need to be used in a highly optimized fashion. Assuming these are not dedicated nodes, but machines shared by other processes too, it would be required to make sure the cache service does not overuse/bloat the machine resources. Optimized usage would require continuous monitoring of usage of these resources and adjusting the internal parameters accordingly. These parameters could be thread counts, memory allocated from heap, priority of thread/process (to free up CPU), receive/send buffer etc.
Each node should have capability to derive the optimal usage of resources on a continuous basis and refined after each optimization run. Parameter dependence (e.g.:- thread priority v/s memory) would be a factor that would need to derive again based on basic statistical record of resource usage. If the nodes are similar in deployment, learning from individual machine/node could be shared among other nodes.
4.) Optimization of node hit rate using duplicate stores.
If its seen that the hit rate of particular object/s is high on a specific node, it would be desired to have duplicates of the same object stored across nodes or across duplicate nodes such that a virtual relay/routing mechanism could be employed to divert the underlying request call. A virtual software relay could be employed just before this set of nodes such that it could route/direct to one of the clone/duplicate nodes. This mechanism assumes custom relay code that determines if the data has been duplicated and then diverts accordingly.
For this self-optimization, the systems needs to have a knowledge base that knows whether a duplicate item is being stored and its hit rate. Each node would need to determine based on the object hit rate in a time duration on whether to duplicate this object. In addition to basic object hit rate frequency, the system can learn from patterns in object usage – a specific group of objects might see high hit rate during Mondays and the system might assign duplicate nodes automatically on Mondays based on the learned hit rate pattern.
This method of store can be exploited as a disaster recovery option too. If one of the node in the duplicate set goes down, we are assured that the system continues to work as the service can now be taken care by the other nodes in the duplicate set.
5.) Optimization for near geography store.
Enterprise applications hosted on the cloud today are distributed on a global scale and when distributed caches are hosted on a cloud, it would be desired to have the most commonly used items near to the consumer geographically.
Dynamic cache clusters (not just cache groups, but cache within a cache in a consistent hash implementation) wherein each target node internally maintains another set of distributed cache could be employed. The dynamic cache cluster creation would be based on the geo usage statistics and would require the nodes to group themselves into a cluster and allocate one of it as a node in the parent cluster.
E.g.:- when the usage across Bangalore is seen to be high for a specific object, this object could be moved to a cluster/node near Bangalore. Internally routing tables would need to be updated accordingly to now point to the new target node.
More than likely, in typical implementations, it would be required to derive geo usage statistics for a group of objects rather than independent objects. The group of objects could be based on an ID or even a derivative function of a record.
6.) Optimized Network utilization
Similar to point 3, optimal usage of network is of high importance in any distributed system.  Whenever a routing happens (cases 5, 4, 1 mentioned above), each node could internally maintain a spanning tree with weightage of paths, with weightage directly reflecting the historical usage of that particular network path for a better optimized usage of the network. Physical routers could be programmed to use a specific path based on learning by each node.

Highly optimized Systems

Highly optimized caches would require one or more of the above strategies to be applied together wherever applicable. This would also require the fn(optimization parameters) to be derived on the go by the system independent of any additional input. 

Sunday 26 May 2013

Need of the hour : Strong mobility in .NET based distributed applications.

Typically, distributed application written in .NET are either code distributed (assemblies moved across machines/nodes) or data distributed (data serialized across machines/nodes). What might additionally be required is strong mobility of process/thread/ TPL task/fiber etc. This effectively means serializing a task/thread with its execution context too. Once this is implemented, this means that a thread/method/delegate function would theoretically be executed across machines, with the context flowing across machines seamlessly and the final result getting back to the starting node.

Though custom framework could be written to emulate strong mobile process by building over the underlying OS provisions, it would require the following :

a.) serializable custom execution context store for each mobile process
b.) serializable data store store for all data - should be straightforward.
c.) serializable code that is independent, with minimal distributed locks and stateless as possible.

Point 1 requires the framework to have provision for events for each context switch and mechanism to indicate back to the local execution system not to proceed with the current execution object as it has been machine switched / context distributed.

All this requires very low level support for the execution runtime and maybe at an OS level too. Given the overall direction of distributed computing and the cloud, this becomes a necessity.

Saturday 25 May 2013

Message ReSequencing in a Distributed Publisher-Subscriber System

Message ReSequencing in a Distributed Publisher-Subscriber System

In a typical cloud based distributed environment with many message publishers and subscribers, the message could be processed by any of the subscriber and this is usually not predictable. In certain scenarios, it could be necessary to have a group of messages processed in sequence though they reached the subscribers randomly.
One solution to this is described below. The solutions tries to satisfy the following requirements :
1.) Make sure there is no tie-up / hard-link between subscribers and publishers: Any message can be received by any subscriber and any publisher can push any message.
2.) When the messages are not grouped, the system continues to behave as usual / before.
3.) When group messages are detected at subscriber, change in overall processing time should be minimal.
4.) When group message are detected at subscriber, no blocking operation should be performed and the subscriber should continue to be available for receiving and processing other messages.
Each group message is expected to contain the following items in addition to the message pay load itself :
            a.) Group Message ID.
            b.) Total Number of Messages in group
            c.) Group Message Sequence Number.
e.g.:-
            a.) GROUP_1
            b.) 4 //GROUP_1 contains 4 messages in total
            c.) 2 //this means that this message is 2nd in the group GROUP_1.
This solution employs subscribers working in a distributed cooperative manner. As a group message is received by a subscriber, it queries the distributed hash table to check if any other subscriber is working on the same group number. If yes, the message received is pushed onto that subscriber. (A push endpoint is expected to be available for each subscriber. This list of end points too could be maintained in the distributed hash table indicated earlier.) If there is no entry in the distributed hash table for this group, the subscriber adds itself into the distributed hash linking the message group id with itself.
GroupHash[GROUP_1] = SUBSCRIBER_ID
Additionally, the message received is added into a local data structure/bag of the receiving subscriber.
When a new message is received in the group_message_queue, the following steps are executed by the watcher@subscriber owning the bag.
            a.) check if all messages for the group has been received.
If all messages have been received, the messages are sorted based on the Group_Message_Sequence_Number and processed one after the other or as the logic demands for the group. Distributed hash entry is cleared for this group GroupHash[GROUP_1] = "" once the processing of message group is complete.
The data structure/bag maintained by the subscriber would be typically filled in the following scenarios :
            a.) message pushed from another subscriber.
            b.) message pushed by the local listener since no other subscriber is working on this group.
Partitioning within groups can be employed if required by employing the same strategy for sub-groups. In this case, it could be required by subscriber to wait for the sub-group messages to be processed before the proceeding with the group messages.
The system can be enhanced such that the subscribers internally check the load of active group message subscriber (as pointed by GroupHash) before pushing the message. This way, the subscriber that received the message can take over the ownership of group message voluntarily; especially if the message received was the last of the group message expected, requiring a message process flow.