Important things

302 http response with Location header for url redirection(GET and Head) - 307 for temporary redirection ,==> Spring Sleuth - tracing in microservices, ==> https://astikanand.github.io/techblogs/high-level-system-design/design-bookmyshow, https://www.hellointerview.com/learn/system-design/in-a-hurry/introduction

Tuesday, 29 June 2021

Desgin patterns



Creational patterns

Abstract factory (recognizeable by creational methods returning the factory itself which in turn can be used to create another abstract/interface type)

Builder (recognizeable by creational methods returning the instance itself)

Factory method (recognizeable by creational methods returning an implementation of an abstract/interface type)

Prototype (recognizeable by creational methods returning a different instance of itself with the same properties)

Singleton (recognizeable by creational methods returning the same instance (usually of itself) everytime)


Structural patterns

Adapter (recognizeable by creational methods taking an instance of different abstract/interface type and returning an implementation of own/another abstract/interface type which decorates/overrides the given instance)

Bridge (recognizeable by creational methods taking an instance of different abstract/interface type and returning an implementation of own abstract/interface type which delegates/uses the given instance)

  • None comes to mind yet. A fictive example would be new LinkedHashMap(LinkedHashSet<K>, List<V>) which returns an unmodifiable linked map which doesn't clone the items, but uses them. The java.util.Collections#newSetFromMap() and singletonXXX() methods however comes close.

Composite (recognizeable by behavioral methods taking an instance of same abstract/interface type into a tree structure)

Decorator (recognizeable by creational methods taking an instance of same abstract/interface type which adds additional behaviour)

Facade (recognizeable by behavioral methods which internally uses instances of different independent abstract/interface types)

Flyweight (recognizeable by creational methods returning a cached instance, a bit the "multiton" idea)

Proxy (recognizeable by creational methods which returns an implementation of given abstract/interface type which in turn delegates/uses a different implementation of given abstract/interface type)


Behavioral patterns

Chain of responsibility (recognizeable by behavioral methods which (indirectly) invokes the same method in another implementation of same abstract/interface type in a queue)

Command (recognizeable by behavioral methods in an abstract/interface type which invokes a method in an implementation of a different abstract/interface type which has been encapsulated by the command implementation during its creation)

Interpreter (recognizeable by behavioral methods returning a structurally different instance/type of the given instance/type; note that parsing/formatting is not part of the pattern, determining the pattern and how to apply it is)

Iterator (recognizeable by behavioral methods sequentially returning instances of a different type from a queue)

Mediator (recognizeable by behavioral methods taking an instance of different abstract/interface type (usually using the command pattern) which delegates/uses the given instance)

Memento (recognizeable by behavioral methods which internally changes the state of the whole instance)

Observer (or Publish/Subscribe) (recognizeable by behavioral methods which invokes a method on an instance of another abstract/interface type, depending on own state)

State (recognizeable by behavioral methods which changes its behaviour depending on the instance's state which can be controlled externally)

Strategy (recognizeable by behavioral methods in an abstract/interface type which invokes a method in an implementation of a different abstract/interface type which has been passed-in as method argument into the strategy implementation)

Template method (recognizeable by behavioral methods which already have a "default" behaviour defined by an abstract type)

Visitor (recognizeable by two different abstract/interface types which has methods defined which takes each the other abstract/interface type; the one actually calls the method of the other and the other executes the desired strategy on it)

Sunday, 27 June 2021

Understanding Kafka Topics and Partitions

 let's look at an overview of producer components:

overview of producer components

1. When a producer is producing a message - It will specify the topic it wants to send the message to, is that right? Does it care about partitions?

Producer will decide target partition to place any message, depending on:

  • Partition id, if it's specified within the message
  • key % num partitions, if no partition id is mentioned
  • Round robin if neither partition id nor message key are available in message, meaning only value is available

2. When a subscriber is running - Does it specify its group id so that it can be part of a cluster of consumers of the same topic or several topics that this group of consumers is interested in?

You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka. It will not be a part of any group. source

3. Does each consumer group have a corresponding partition on the broker or does each consumer have one?

In one consumer group, each partition will be processed by one consumer only. These are the possible scenarios

  • Number of consumers is less than number of topic partitions then multiple partitions can be assigned to one of the consumer in the group number of consumers less than topic partitions
  • Number of consumers same as number of topic partitions, then partition and consumer mapping can be like below, number of consumers same as number of topic partitions
  • Number of consumers is higher than number of topic partitions, then partition and consumer mapping can be as seen below, Not effective, check Consumer 5 number of consumers more than number of topic partitions

4. As the partitions created by the broker, therefore not a concern for the consumers?

Consumer should be aware of the number of partitions, as was discussed in question 3.

5. Since this is a queue with an offset for each partition, is it responsibility of the consumer to specify which messages it wants to read? Does it need to save its state?

Kafka(to be specific Group Coordinator) takes care of the offset state by producing a message to an internal __consumer_offsets topic, this behavior can be configurable to manual as well by setting enable.auto.commit to false. In that case consumer.commitSync() and consumer.commitAsync() can be helpful for managing offset.

More about Group Coordinator:

  1. It's one of the elected broker in the cluster from Kafka server side.
  2. Consumers interact with Group Coordinator for offset commits and fetch requests.
  3. Consumer sends periodic heartbeats to Group Coordinator.

6. What happens when a message is deleted from the queue? - For example: The retention was for 3 hours, then the time passes, how is the offset being handled on both sides?

If any consumer starts after retention period, messages will be consumed as per auto.offset.reset configuration which could be latest/earliest. technically it's latest(start processing new messages) because all the messages got expired by that time and retention is topic level configuration.

Saturday, 26 June 2021

design distributed Job scheduler

 Functional requirements (can vary but I assume the following):

  • A job can be scheduled for one time or multiple executions (cron job) by other services/microservices
  • For each job a class can be specified which inherits some interface like IJob so that we can later call that interface method on the worker nodes when we execute the job. (That class can e.g. be present in a .jar file on the worker nodes).
  • Results of job executions are stored and can be queried

Non-function requirements (again, can vary but I assume the following):

  • Scalability: Thousands or even millions of jobs can be scheduled and run per day
  • Durability: Jobs must not get lost -> we need to persist jobs
  • Reliability: Jobs must not be executed much later than expected or dropped -> we need a fault-tolerant system
  • Availability: It should always be possible to schedule and execute jobs -> (dynamical) horizontal scaling
  • Jobs must not be executed multiple times (or such occurences should be kept to a minimum)

Domain Analysis: Concepts
We can define the Domain Model that can later be converted into a Data Model (Database Model for Schema or a Model for ZooKeeper):

Job:

  • Represents a Job to be executed
  • Properties:
    Id, Name, JobExecutorClass, Priority, Running, LastStartTime, LastEndTime, LastExecutor, Data (Parameters)

Trigger (based on the concept Quartz Scheduler uses):

  • Defines when a Job is executed
  • We can define different Triggers like: OneTimeTrigger, CronTrigger
  • Based on the type, we have properties like:
    Id, Type, StartTime, EndTime, OneTime, Cronjob, Interval

Executor:

  • Is a single Job Executor/Worker Node
  • Can have Properties like:
    Id (e.g. IP-based), LastHeartBeat

The multiplicity between Job and Trigger: Job--1-------*--Trigger (A Job can have multiple Triggers)


HLD (High Level Design)
Now, lets look at a high-level System Design diagram:

image

Microservices that want to schedule a non-/recuring Job: Can send a Message (or produce in Kafka terminology) to the corresponding Kafka Queue (precisely a Topic).

Job Scheduler Service: Will consume the Messages (requesting a Job enqueueing). They will generate a unique Id using e.g. the Snowflake ID Generation concept. Based on that ID (e.g. by hashing it) they decide into which Database Partition the Job will go. They create a Job and Trigger record according to the Message in the corresponding Database Partition.

RDBMS: I chose an RDBMS because we will later need ACID properties, meaning transactions. The database is sharded into an adequate number of shards to distribute the load and data. We use Active-Passive/Master-Slave Replication for each Partition in a semi-synchronous fashion. One Slave/Follower will follow synchronously while the others will receive the Replication Stream asynchronously. That way, we can be sure that at least one Slave holds up to date data in case the Master fails (due to network partitions, server outage, etc.) and that Slave will be promoted to be the new Leader.

Job Executor Service:
1. On Startup it will fetch the Database Partitioning info from ZooKeeper as well as the Partition Assignment between other instances and the Database Partitions.
2. It will choose a Database Partition which has the least number of Executors assigned to balance out the number of Executors that execute Jobs for a all the different Database Partitions.
3. It will send/store the Partition Assignment to ZooKeeper.
4. It constantly sends Heartbeats to ZooKeeper
5. It pulls information from the Database Partition and fights with other Executor instances assigned to the same Database Partition for Jobs that are due to execute. The fighting works by using Row Locks. That is why we need transactional properties (hence an RDBS that supports that, not all really do!)
6. Before an Executor Node executes a Job, it will update the Job record in the DB: Flagging it to be “Running”, store the “StartTime” and which Executor node (=itself) is executing the Job, etc.
7. When an Executor Node fails then other Nodes (assigned the same Partition) can detect it using ZooKeeper (due to the Heartbeating). They can then find all the Jobs that the failed Node was executing (Flag=Running and LastExecutor=Failed Node) and can fight for those Jobs to execute them (we could make a Job configurable to retry or not in case of execution failure).
8. Finally after successfully/unsuccessfully executing a Job, we send/publish/produce a Message to another Kafka Queue.

Regarding Point 7 (to expand the horizon on possibilities): We could also use Broadcast Messaging or a Gossip Protocol to detect Node failures. I'm excited to hear your argumentation.

Result Handler Service: Will consume Messages and store the Execution Result in a NoSQL database like Cassandra – Cassandra has a great write-throughput due to the fact that it is Leaderless (no time lost for fail overs for example), replicates asynchronously (can be configured) etc. We are also okay with Eventual Consistency because it is not crucial if we see a result with some delay.

Coordination Service, ZooKeeper: Stores the above mentioned information. Regarding the Database Partitioning information: We can e.g. load that info into ZooKeeper from another Service/Configuration file.

Message Queues, in general: We use Message Queues (here Kafka which is more of an (append) log than a Queue in the conventional sense, you can find great docs on the official website) in order to:

  • Be able to scale the consumer and producer nodes independently (in Kafka we have Topics which can be horizontally partitioned and scale by that)
  • We decouple the consumer and producer from each other
  • Lower latency for the producer (doesn't have to wait for a response)
  • Durability and Reliability: When a Consumer Node crashes another Node can process the Message which otherwise would be lost (see Offset in Kafka). The Messages are persisted.
  • We can throttle/limit the number of messages the consumers process (see Backpressure)
  • Kafka offers message ordering

Other Issues and Concerns

  • Unreliable Clocks/Time: In a distributed system we have unreliable clocks and time (due to unbounded delays when requesting NTP time because we are using packet-switched networks and usually not circuit-switched ones), unreliable NTP servers, Quartz Clocks that develop an offset, etc). When we want to schedule Jobs reliably and execute them at the right time, clocks and time play an important role. We therefore need to make sure that the times on the nodes are synchronized and don't differ too much. One way to achieve that is to use multiple NTP servers and filter out those that deviate much. Another more reliable but costly way is to use Atomic Clocks in the Data Center(s).

Final Words:

  • Of course this is a somewhat “simple” approach that would probably fit into a 45 minutes system design interview
  • I’m aware that there are a lot of difficulties to actually build such a system, e.g.: Distributing Jobs correctly based on the load/work they have to do (and using the CPUs most efficiently), Exactly-once-delivery (/execution) and so on. Feel free to start a discussion :).
  • I didn’t go deeply into things like how ZooKeeper or Apache Kafka works since that would blow the scope of this post. I assume that you know that those systems are already built in a distributed, fault-tolerant way.