Friday 9 August 2013

Self-optimization in Distributed caches.

Self-optimization in Distributed caches.


Distributed caches are systems where the cache data/objects are stored across distributed nodes/machine. When a data is stored/retrieved by the consuming application, one or more of systems in the distributed system serve the request. This paper attempts to identify self-optimization techniques that could be applied to this distributed cache. For a base implementation of the distributed cache, the open source project HoC (herd of cache @ http://hoc.codeplex.com) is referred. This project implements the distributed cache in .NET using the concepts of consistent hashing.
Self-Optimization in distributed computing refers to the capability of distributed systems to optimize independent of any intervention - machine/human. In a typical de-centralized and cooperative system like HoC, this means the nodes in the distributed cache can make decisions either independent or together. The latter would require the use of various consensus algorithms to be applied by this distributed cache.

Self-Optimization: Candidate Use Cases:
1.) Optimization of node load: decision made by internally by hosting nodes
In a typical consistent hash implementation, there is possibility that the number of objects stored in the cache of some of the nodes are high compared to the other neighboring nodes. This requires some of the data to be moved to the neighboring nodes. This would first include a node first asking the neighboring node for its load. If it detects that the total count of self is considerably higher, it would apply a partitioning of the objects stored and move the selected objects.
Locating an item in the cache would require multiple hops to reach the target node where the data is stored. Whenever a node gets a request for an item that has been moved to a neighboring node, it would require the call to be routed to the neighboring node. Each node is expected to maintain a list of objects that were moved and the target neighboring node to which the object was moved.
During each fetch, the path/nodes traversed to reach the target node could be returned back to the caller such that the next call to the same object directly calls the target server while avoiding the intermediary traverse across nodes.
The end result of this approach would be a more balanced store of objects across nodes.

2.) Self-Optimizing Consistent Hash Algorithm for load balancing
In a consistent hash implementation, similar to a hash bucket, the target node is selected based on the hash key returned by the underlying hashing algorithm. A typical problem would be that the data could get collected more at a specific server.  An alternative approach to solution 1 indicated above would be to apply machine learning approaches such that the change/adjustment -> fn(load distribution) required to adjust the hash algorithm can be identified. In this case, it should be noted that the fn(load distribution) required to normalize the overall load is specific to each system. A pattern could be detected for a specific system/installation and the load pattern for this system could be derived.
Applying this change to the underlying hash key algorithm would require a possible reset of the distributed system. Once reset, the adjustment learned/deduced by the system => fn(load distribution) would need to be applied each time a new object requires to be saved/retrieved. This adjustment function itself could be tweaked further down the time automatically by the system such that a new adjustment function is derived for the next run.
To monitor the overall usage pattern / load across nodes, it would be required to have a data store where the node v/s storage vs. load factor could be stored. Each data stored into the cache system would require its statistics to be stored into this data store. The next reset would require fn(load distribution) to be derived and applied to the underlying hash algorithm such that the load is more spread out in the next run.
This optimization technique assumes that the kind of data including its type, format, locale etc. does not vary considerably across resets.
3.) Optimized resource utilization on nodes
The CPU, RAM and other resources of each nodes would need to be used in a highly optimized fashion. Assuming these are not dedicated nodes, but machines shared by other processes too, it would be required to make sure the cache service does not overuse/bloat the machine resources. Optimized usage would require continuous monitoring of usage of these resources and adjusting the internal parameters accordingly. These parameters could be thread counts, memory allocated from heap, priority of thread/process (to free up CPU), receive/send buffer etc.
Each node should have capability to derive the optimal usage of resources on a continuous basis and refined after each optimization run. Parameter dependence (e.g.:- thread priority v/s memory) would be a factor that would need to derive again based on basic statistical record of resource usage. If the nodes are similar in deployment, learning from individual machine/node could be shared among other nodes.
4.) Optimization of node hit rate using duplicate stores.
If its seen that the hit rate of particular object/s is high on a specific node, it would be desired to have duplicates of the same object stored across nodes or across duplicate nodes such that a virtual relay/routing mechanism could be employed to divert the underlying request call. A virtual software relay could be employed just before this set of nodes such that it could route/direct to one of the clone/duplicate nodes. This mechanism assumes custom relay code that determines if the data has been duplicated and then diverts accordingly.
For this self-optimization, the systems needs to have a knowledge base that knows whether a duplicate item is being stored and its hit rate. Each node would need to determine based on the object hit rate in a time duration on whether to duplicate this object. In addition to basic object hit rate frequency, the system can learn from patterns in object usage – a specific group of objects might see high hit rate during Mondays and the system might assign duplicate nodes automatically on Mondays based on the learned hit rate pattern.
This method of store can be exploited as a disaster recovery option too. If one of the node in the duplicate set goes down, we are assured that the system continues to work as the service can now be taken care by the other nodes in the duplicate set.
5.) Optimization for near geography store.
Enterprise applications hosted on the cloud today are distributed on a global scale and when distributed caches are hosted on a cloud, it would be desired to have the most commonly used items near to the consumer geographically.
Dynamic cache clusters (not just cache groups, but cache within a cache in a consistent hash implementation) wherein each target node internally maintains another set of distributed cache could be employed. The dynamic cache cluster creation would be based on the geo usage statistics and would require the nodes to group themselves into a cluster and allocate one of it as a node in the parent cluster.
E.g.:- when the usage across Bangalore is seen to be high for a specific object, this object could be moved to a cluster/node near Bangalore. Internally routing tables would need to be updated accordingly to now point to the new target node.
More than likely, in typical implementations, it would be required to derive geo usage statistics for a group of objects rather than independent objects. The group of objects could be based on an ID or even a derivative function of a record.
6.) Optimized Network utilization
Similar to point 3, optimal usage of network is of high importance in any distributed system.  Whenever a routing happens (cases 5, 4, 1 mentioned above), each node could internally maintain a spanning tree with weightage of paths, with weightage directly reflecting the historical usage of that particular network path for a better optimized usage of the network. Physical routers could be programmed to use a specific path based on learning by each node.

Highly optimized Systems

Highly optimized caches would require one or more of the above strategies to be applied together wherever applicable. This would also require the fn(optimization parameters) to be derived on the go by the system independent of any additional input. 

Sunday 26 May 2013

Need of the hour : Strong mobility in .NET based distributed applications.

Typically, distributed application written in .NET are either code distributed (assemblies moved across machines/nodes) or data distributed (data serialized across machines/nodes). What might additionally be required is strong mobility of process/thread/ TPL task/fiber etc. This effectively means serializing a task/thread with its execution context too. Once this is implemented, this means that a thread/method/delegate function would theoretically be executed across machines, with the context flowing across machines seamlessly and the final result getting back to the starting node.

Though custom framework could be written to emulate strong mobile process by building over the underlying OS provisions, it would require the following :

a.) serializable custom execution context store for each mobile process
b.) serializable data store store for all data - should be straightforward.
c.) serializable code that is independent, with minimal distributed locks and stateless as possible.

Point 1 requires the framework to have provision for events for each context switch and mechanism to indicate back to the local execution system not to proceed with the current execution object as it has been machine switched / context distributed.

All this requires very low level support for the execution runtime and maybe at an OS level too. Given the overall direction of distributed computing and the cloud, this becomes a necessity.

Saturday 25 May 2013

Message ReSequencing in a Distributed Publisher-Subscriber System

Message ReSequencing in a Distributed Publisher-Subscriber System

In a typical cloud based distributed environment with many message publishers and subscribers, the message could be processed by any of the subscriber and this is usually not predictable. In certain scenarios, it could be necessary to have a group of messages processed in sequence though they reached the subscribers randomly.
One solution to this is described below. The solutions tries to satisfy the following requirements :
1.) Make sure there is no tie-up / hard-link between subscribers and publishers: Any message can be received by any subscriber and any publisher can push any message.
2.) When the messages are not grouped, the system continues to behave as usual / before.
3.) When group messages are detected at subscriber, change in overall processing time should be minimal.
4.) When group message are detected at subscriber, no blocking operation should be performed and the subscriber should continue to be available for receiving and processing other messages.
Each group message is expected to contain the following items in addition to the message pay load itself :
            a.) Group Message ID.
            b.) Total Number of Messages in group
            c.) Group Message Sequence Number.
e.g.:-
            a.) GROUP_1
            b.) 4 //GROUP_1 contains 4 messages in total
            c.) 2 //this means that this message is 2nd in the group GROUP_1.
This solution employs subscribers working in a distributed cooperative manner. As a group message is received by a subscriber, it queries the distributed hash table to check if any other subscriber is working on the same group number. If yes, the message received is pushed onto that subscriber. (A push endpoint is expected to be available for each subscriber. This list of end points too could be maintained in the distributed hash table indicated earlier.) If there is no entry in the distributed hash table for this group, the subscriber adds itself into the distributed hash linking the message group id with itself.
GroupHash[GROUP_1] = SUBSCRIBER_ID
Additionally, the message received is added into a local data structure/bag of the receiving subscriber.
When a new message is received in the group_message_queue, the following steps are executed by the watcher@subscriber owning the bag.
            a.) check if all messages for the group has been received.
If all messages have been received, the messages are sorted based on the Group_Message_Sequence_Number and processed one after the other or as the logic demands for the group. Distributed hash entry is cleared for this group GroupHash[GROUP_1] = "" once the processing of message group is complete.
The data structure/bag maintained by the subscriber would be typically filled in the following scenarios :
            a.) message pushed from another subscriber.
            b.) message pushed by the local listener since no other subscriber is working on this group.
Partitioning within groups can be employed if required by employing the same strategy for sub-groups. In this case, it could be required by subscriber to wait for the sub-group messages to be processed before the proceeding with the group messages.
The system can be enhanced such that the subscribers internally check the load of active group message subscriber (as pointed by GroupHash) before pushing the message. This way, the subscriber that received the message can take over the ownership of group message voluntarily; especially if the message received was the last of the group message expected, requiring a message process flow.