Monday, 12 October 2020

Kafka Streams has an edge over Service Fabric ?

Compared against the .NET/Azure offerings, the level of abstraction enabled by Kafka Streams for event processing while exploiting underlying Kafka message-topic-queue patterns is pretty neat. 
 
Did come across an interesting framework that used C# libraries over Kafka Streams by @tonysneed in GitHub too here : https://wp.me/pWU98-1v2
 
Hope Service Fabric Mesh Reliable Actor or similar offerings from Azure catches up with Kafka Streams in terms of seamless integration for distributed event processing.
 
For a start, assuring messages are processed 'exactly-once' is a basic requirement for most distributed systems. Yet to come across native frameworks in the .NET world that use Azure/Akka.NET streams/Service Fabric Mesh or the likes that enable essential distributed capabilities like 'exactly-once' and others with minimal developer effort :!

#azure #kafka #confluent #kafkastreams #eventsourcing #akka #distributedcomputing #cloudarchitecture

Saturday, 29 June 2019

Software Engineering lost in the cloud?

It would seem the cloud is making you a lazy software engineer. Engineers these days are now have a ready answer for most of the architectural and design concerns - "its taken care at the cloud". This perception is scary and appears to makes any tom-dick-harry engineer with minimal to zero computer/software knowledge "become" "master" software-engineer overnight. This halo is bothering. Whatever happened to clean code / patterns essential to designing your software during the days of distributed computing setup in local clusters ? Perhaps none today cares about minimizing traffic across nodes and syncing time across nodes nor time sharing and optimizing resources during your minimal time at the node. Not sure the solution for this until you are choked to become yet another Harry. hashtag

Saturday, 12 January 2019

ServerFULL deployments

Moving away from Typical service deployments

Rather than have services typically tied to a set of machines and load balanced as-is today in the SOA/SaaS/Microservices world, what if we could just throw a set of servers and get them be assigned/allocated dynamically and more specifically, attain tight packing of services on the same hardware ?

Though mostly exploited on the Cloud with AWS Lamdas and Azure Functions, Serverless as a pattern are awesome for OnPremise deployments too. An interesting set of options for ServerLess OnPremise is available at this list. Though its quite a misnomer in cases of OnPremise deployments where we really need to bother about extreme and efficient hardware utilization of the server, it is preferable to call this approach as ServerFULL as the desired effect is to be fill up the server to the FULL ;)

Once the Docker Images/perhaps later Memory Images are available in shared In-memory/SSD drives, any of the machines/VM could be dynamically chosen for deploying the service and finally un-deployed down once done, allowing the space for the next.

OpenFaas/OpenWhisk seem to be on top of the list with both exploiting Docker containers. Though there is still constraints on elasticity (bringing up new VMs that finally run the Containers is time consuming, while adding more physical machine could take days), it is still an exciting means to efficiently exploit what is available on-premise in the moment.

Just like in Serverless world on the cloud, Services that consume high resources (CPU/RAM) for long duration and the ones that comparatively take higher time to spawn, might not be a candidate for being in the ServerFULL environment as these tend to block up the VMs/containers for long.

Think of designing typical business workflows with events, triggers, logic, nested flows and actions that span in/out, with these getting mapped into services by developers and further mapped to the ServerFULL world of machines dynamically - quite exciting times.

References: 


  1. https://martinfowler.com/articles/serverless.html
  2. https://winderresearch.com/a-comparison-of-serverless-frameworks-for-kubernetes-openfaas-openwhisk-fission-kubeless-and-more/


Wednesday, 11 July 2018

Structural Imbalance - In Software Systems


We come across many instances in the industry where "code-lumps" get deployed as software services/products with a beautiful UI included to cover up all the ugliness underneath. The design document too look fancy with usages of software patterns neatly listed. After all this stunt, these modules end up with a short life-span and before long, there are in-numerous critical issues being raised. 
In majority of these cases, product owners were forced into releasing these "code-lumps" that just weren't ready, while in other cases the anointed "architect" had no clues to why the "code-lumps" exist and why the pattern was used. At the first look, the software does appear to function as desired with all the components "working" great in the demos.

How could these be avoided in the first place ?

Just like in typical broken buildings we see across the road, structural imbalance refers to modules that doesn't making sense together. Individually, these chosen components / patterns appear perfect for the problem at hand but they just don't sync enough; structurally.
Right from a birds-eye/logical view to the drilled-down/code view, its critical that a dedicated team of architects reach consensus on the many choices being made every day by engineers.
Only if the team of architects had identified the applicable Non-Functional-Requirements (NFR) and defined them initially. Architects and the team of software designers could drill down into one or modules for a detailed design before coding. 
Though check-ins could be allowed from all engineers, none of it should reach the release pipeline until all the "code-lumps" were removed. Architects & designers must agree that the code comply towards the agreed NFR before promoting the code up its life-cycle.
Working closely with the architects, the product owner would now be more confident in communicating with the stake holders.
Do look forward related article on "Why all software engineers must NOT automatically become an 'Architect' " ; which in addition to looking at skill & interest, also touches upon the essential philosophical outlook required by any upcoming architect.

Friday, 9 August 2013

Self-optimization in Distributed caches.

Self-optimization in Distributed caches.


Distributed caches are systems where the cache data/objects are stored across distributed nodes/machine. When a data is stored/retrieved by the consuming application, one or more of systems in the distributed system serve the request. This paper attempts to identify self-optimization techniques that could be applied to this distributed cache. For a base implementation of the distributed cache, the open source project HoC (herd of cache @ http://hoc.codeplex.com) is referred. This project implements the distributed cache in .NET using the concepts of consistent hashing.
Self-Optimization in distributed computing refers to the capability of distributed systems to optimize independent of any intervention - machine/human. In a typical de-centralized and cooperative system like HoC, this means the nodes in the distributed cache can make decisions either independent or together. The latter would require the use of various consensus algorithms to be applied by this distributed cache.

Self-Optimization: Candidate Use Cases:
1.) Optimization of node load: decision made by internally by hosting nodes
In a typical consistent hash implementation, there is possibility that the number of objects stored in the cache of some of the nodes are high compared to the other neighboring nodes. This requires some of the data to be moved to the neighboring nodes. This would first include a node first asking the neighboring node for its load. If it detects that the total count of self is considerably higher, it would apply a partitioning of the objects stored and move the selected objects.
Locating an item in the cache would require multiple hops to reach the target node where the data is stored. Whenever a node gets a request for an item that has been moved to a neighboring node, it would require the call to be routed to the neighboring node. Each node is expected to maintain a list of objects that were moved and the target neighboring node to which the object was moved.
During each fetch, the path/nodes traversed to reach the target node could be returned back to the caller such that the next call to the same object directly calls the target server while avoiding the intermediary traverse across nodes.
The end result of this approach would be a more balanced store of objects across nodes.

2.) Self-Optimizing Consistent Hash Algorithm for load balancing
In a consistent hash implementation, similar to a hash bucket, the target node is selected based on the hash key returned by the underlying hashing algorithm. A typical problem would be that the data could get collected more at a specific server.  An alternative approach to solution 1 indicated above would be to apply machine learning approaches such that the change/adjustment -> fn(load distribution) required to adjust the hash algorithm can be identified. In this case, it should be noted that the fn(load distribution) required to normalize the overall load is specific to each system. A pattern could be detected for a specific system/installation and the load pattern for this system could be derived.
Applying this change to the underlying hash key algorithm would require a possible reset of the distributed system. Once reset, the adjustment learned/deduced by the system => fn(load distribution) would need to be applied each time a new object requires to be saved/retrieved. This adjustment function itself could be tweaked further down the time automatically by the system such that a new adjustment function is derived for the next run.
To monitor the overall usage pattern / load across nodes, it would be required to have a data store where the node v/s storage vs. load factor could be stored. Each data stored into the cache system would require its statistics to be stored into this data store. The next reset would require fn(load distribution) to be derived and applied to the underlying hash algorithm such that the load is more spread out in the next run.
This optimization technique assumes that the kind of data including its type, format, locale etc. does not vary considerably across resets.
3.) Optimized resource utilization on nodes
The CPU, RAM and other resources of each nodes would need to be used in a highly optimized fashion. Assuming these are not dedicated nodes, but machines shared by other processes too, it would be required to make sure the cache service does not overuse/bloat the machine resources. Optimized usage would require continuous monitoring of usage of these resources and adjusting the internal parameters accordingly. These parameters could be thread counts, memory allocated from heap, priority of thread/process (to free up CPU), receive/send buffer etc.
Each node should have capability to derive the optimal usage of resources on a continuous basis and refined after each optimization run. Parameter dependence (e.g.:- thread priority v/s memory) would be a factor that would need to derive again based on basic statistical record of resource usage. If the nodes are similar in deployment, learning from individual machine/node could be shared among other nodes.
4.) Optimization of node hit rate using duplicate stores.
If its seen that the hit rate of particular object/s is high on a specific node, it would be desired to have duplicates of the same object stored across nodes or across duplicate nodes such that a virtual relay/routing mechanism could be employed to divert the underlying request call. A virtual software relay could be employed just before this set of nodes such that it could route/direct to one of the clone/duplicate nodes. This mechanism assumes custom relay code that determines if the data has been duplicated and then diverts accordingly.
For this self-optimization, the systems needs to have a knowledge base that knows whether a duplicate item is being stored and its hit rate. Each node would need to determine based on the object hit rate in a time duration on whether to duplicate this object. In addition to basic object hit rate frequency, the system can learn from patterns in object usage – a specific group of objects might see high hit rate during Mondays and the system might assign duplicate nodes automatically on Mondays based on the learned hit rate pattern.
This method of store can be exploited as a disaster recovery option too. If one of the node in the duplicate set goes down, we are assured that the system continues to work as the service can now be taken care by the other nodes in the duplicate set.
5.) Optimization for near geography store.
Enterprise applications hosted on the cloud today are distributed on a global scale and when distributed caches are hosted on a cloud, it would be desired to have the most commonly used items near to the consumer geographically.
Dynamic cache clusters (not just cache groups, but cache within a cache in a consistent hash implementation) wherein each target node internally maintains another set of distributed cache could be employed. The dynamic cache cluster creation would be based on the geo usage statistics and would require the nodes to group themselves into a cluster and allocate one of it as a node in the parent cluster.
E.g.:- when the usage across Bangalore is seen to be high for a specific object, this object could be moved to a cluster/node near Bangalore. Internally routing tables would need to be updated accordingly to now point to the new target node.
More than likely, in typical implementations, it would be required to derive geo usage statistics for a group of objects rather than independent objects. The group of objects could be based on an ID or even a derivative function of a record.
6.) Optimized Network utilization
Similar to point 3, optimal usage of network is of high importance in any distributed system.  Whenever a routing happens (cases 5, 4, 1 mentioned above), each node could internally maintain a spanning tree with weightage of paths, with weightage directly reflecting the historical usage of that particular network path for a better optimized usage of the network. Physical routers could be programmed to use a specific path based on learning by each node.

Highly optimized Systems

Highly optimized caches would require one or more of the above strategies to be applied together wherever applicable. This would also require the fn(optimization parameters) to be derived on the go by the system independent of any additional input. 

Sunday, 26 May 2013

Need of the hour : Strong mobility in .NET based distributed applications.

Typically, distributed application written in .NET are either code distributed (assemblies moved across machines/nodes) or data distributed (data serialized across machines/nodes). What might additionally be required is strong mobility of process/thread/ TPL task/fiber etc. This effectively means serializing a task/thread with its execution context too. Once this is implemented, this means that a thread/method/delegate function would theoretically be executed across machines, with the context flowing across machines seamlessly and the final result getting back to the starting node.

Though custom framework could be written to emulate strong mobile process by building over the underlying OS provisions, it would require the following :

a.) serializable custom execution context store for each mobile process
b.) serializable data store store for all data - should be straightforward.
c.) serializable code that is independent, with minimal distributed locks and stateless as possible.

Point 1 requires the framework to have provision for events for each context switch and mechanism to indicate back to the local execution system not to proceed with the current execution object as it has been machine switched / context distributed.

All this requires very low level support for the execution runtime and maybe at an OS level too. Given the overall direction of distributed computing and the cloud, this becomes a necessity.

Saturday, 25 May 2013

Message ReSequencing in a Distributed Publisher-Subscriber System

Message ReSequencing in a Distributed Publisher-Subscriber System

In a typical cloud based distributed environment with many message publishers and subscribers, the message could be processed by any of the subscriber and this is usually not predictable. In certain scenarios, it could be necessary to have a group of messages processed in sequence though they reached the subscribers randomly.
One solution to this is described below. The solutions tries to satisfy the following requirements :
1.) Make sure there is no tie-up / hard-link between subscribers and publishers: Any message can be received by any subscriber and any publisher can push any message.
2.) When the messages are not grouped, the system continues to behave as usual / before.
3.) When group messages are detected at subscriber, change in overall processing time should be minimal.
4.) When group message are detected at subscriber, no blocking operation should be performed and the subscriber should continue to be available for receiving and processing other messages.
Each group message is expected to contain the following items in addition to the message pay load itself :
            a.) Group Message ID.
            b.) Total Number of Messages in group
            c.) Group Message Sequence Number.
e.g.:-
            a.) GROUP_1
            b.) 4 //GROUP_1 contains 4 messages in total
            c.) 2 //this means that this message is 2nd in the group GROUP_1.
This solution employs subscribers working in a distributed cooperative manner. As a group message is received by a subscriber, it queries the distributed hash table to check if any other subscriber is working on the same group number. If yes, the message received is pushed onto that subscriber. (A push endpoint is expected to be available for each subscriber. This list of end points too could be maintained in the distributed hash table indicated earlier.) If there is no entry in the distributed hash table for this group, the subscriber adds itself into the distributed hash linking the message group id with itself.
GroupHash[GROUP_1] = SUBSCRIBER_ID
Additionally, the message received is added into a local data structure/bag of the receiving subscriber.
When a new message is received in the group_message_queue, the following steps are executed by the watcher@subscriber owning the bag.
            a.) check if all messages for the group has been received.
If all messages have been received, the messages are sorted based on the Group_Message_Sequence_Number and processed one after the other or as the logic demands for the group. Distributed hash entry is cleared for this group GroupHash[GROUP_1] = "" once the processing of message group is complete.
The data structure/bag maintained by the subscriber would be typically filled in the following scenarios :
            a.) message pushed from another subscriber.
            b.) message pushed by the local listener since no other subscriber is working on this group.
Partitioning within groups can be employed if required by employing the same strategy for sub-groups. In this case, it could be required by subscriber to wait for the sub-group messages to be processed before the proceeding with the group messages.
The system can be enhanced such that the subscribers internally check the load of active group message subscriber (as pointed by GroupHash) before pushing the message. This way, the subscriber that received the message can take over the ownership of group message voluntarily; especially if the message received was the last of the group message expected, requiring a message process flow.