Important things

302 http response with Location header for url redirection(GET and Head) - 307 for temporary redirection ,==> Spring Sleuth - tracing in microservices, ==> https://astikanand.github.io/techblogs/high-level-system-design/design-bookmyshow, https://www.hellointerview.com/learn/system-design/in-a-hurry/introduction

Sunday, 27 June 2021

Understanding Kafka Topics and Partitions

 let's look at an overview of producer components:

overview of producer components

1. When a producer is producing a message - It will specify the topic it wants to send the message to, is that right? Does it care about partitions?

Producer will decide target partition to place any message, depending on:

  • Partition id, if it's specified within the message
  • key % num partitions, if no partition id is mentioned
  • Round robin if neither partition id nor message key are available in message, meaning only value is available

2. When a subscriber is running - Does it specify its group id so that it can be part of a cluster of consumers of the same topic or several topics that this group of consumers is interested in?

You should always configure group.id unless you are using the simple assignment API and you don’t need to store offsets in Kafka. It will not be a part of any group. source

3. Does each consumer group have a corresponding partition on the broker or does each consumer have one?

In one consumer group, each partition will be processed by one consumer only. These are the possible scenarios

  • Number of consumers is less than number of topic partitions then multiple partitions can be assigned to one of the consumer in the group number of consumers less than topic partitions
  • Number of consumers same as number of topic partitions, then partition and consumer mapping can be like below, number of consumers same as number of topic partitions
  • Number of consumers is higher than number of topic partitions, then partition and consumer mapping can be as seen below, Not effective, check Consumer 5 number of consumers more than number of topic partitions

4. As the partitions created by the broker, therefore not a concern for the consumers?

Consumer should be aware of the number of partitions, as was discussed in question 3.

5. Since this is a queue with an offset for each partition, is it responsibility of the consumer to specify which messages it wants to read? Does it need to save its state?

Kafka(to be specific Group Coordinator) takes care of the offset state by producing a message to an internal __consumer_offsets topic, this behavior can be configurable to manual as well by setting enable.auto.commit to false. In that case consumer.commitSync() and consumer.commitAsync() can be helpful for managing offset.

More about Group Coordinator:

  1. It's one of the elected broker in the cluster from Kafka server side.
  2. Consumers interact with Group Coordinator for offset commits and fetch requests.
  3. Consumer sends periodic heartbeats to Group Coordinator.

6. What happens when a message is deleted from the queue? - For example: The retention was for 3 hours, then the time passes, how is the offset being handled on both sides?

If any consumer starts after retention period, messages will be consumed as per auto.offset.reset configuration which could be latest/earliest. technically it's latest(start processing new messages) because all the messages got expired by that time and retention is topic level configuration.

Saturday, 26 June 2021

design distributed Job scheduler

 Functional requirements (can vary but I assume the following):

  • A job can be scheduled for one time or multiple executions (cron job) by other services/microservices
  • For each job a class can be specified which inherits some interface like IJob so that we can later call that interface method on the worker nodes when we execute the job. (That class can e.g. be present in a .jar file on the worker nodes).
  • Results of job executions are stored and can be queried

Non-function requirements (again, can vary but I assume the following):

  • Scalability: Thousands or even millions of jobs can be scheduled and run per day
  • Durability: Jobs must not get lost -> we need to persist jobs
  • Reliability: Jobs must not be executed much later than expected or dropped -> we need a fault-tolerant system
  • Availability: It should always be possible to schedule and execute jobs -> (dynamical) horizontal scaling
  • Jobs must not be executed multiple times (or such occurences should be kept to a minimum)

Domain Analysis: Concepts
We can define the Domain Model that can later be converted into a Data Model (Database Model for Schema or a Model for ZooKeeper):

Job:

  • Represents a Job to be executed
  • Properties:
    Id, Name, JobExecutorClass, Priority, Running, LastStartTime, LastEndTime, LastExecutor, Data (Parameters)

Trigger (based on the concept Quartz Scheduler uses):

  • Defines when a Job is executed
  • We can define different Triggers like: OneTimeTrigger, CronTrigger
  • Based on the type, we have properties like:
    Id, Type, StartTime, EndTime, OneTime, Cronjob, Interval

Executor:

  • Is a single Job Executor/Worker Node
  • Can have Properties like:
    Id (e.g. IP-based), LastHeartBeat

The multiplicity between Job and Trigger: Job--1-------*--Trigger (A Job can have multiple Triggers)


HLD (High Level Design)
Now, lets look at a high-level System Design diagram:

image

Microservices that want to schedule a non-/recuring Job: Can send a Message (or produce in Kafka terminology) to the corresponding Kafka Queue (precisely a Topic).

Job Scheduler Service: Will consume the Messages (requesting a Job enqueueing). They will generate a unique Id using e.g. the Snowflake ID Generation concept. Based on that ID (e.g. by hashing it) they decide into which Database Partition the Job will go. They create a Job and Trigger record according to the Message in the corresponding Database Partition.

RDBMS: I chose an RDBMS because we will later need ACID properties, meaning transactions. The database is sharded into an adequate number of shards to distribute the load and data. We use Active-Passive/Master-Slave Replication for each Partition in a semi-synchronous fashion. One Slave/Follower will follow synchronously while the others will receive the Replication Stream asynchronously. That way, we can be sure that at least one Slave holds up to date data in case the Master fails (due to network partitions, server outage, etc.) and that Slave will be promoted to be the new Leader.

Job Executor Service:
1. On Startup it will fetch the Database Partitioning info from ZooKeeper as well as the Partition Assignment between other instances and the Database Partitions.
2. It will choose a Database Partition which has the least number of Executors assigned to balance out the number of Executors that execute Jobs for a all the different Database Partitions.
3. It will send/store the Partition Assignment to ZooKeeper.
4. It constantly sends Heartbeats to ZooKeeper
5. It pulls information from the Database Partition and fights with other Executor instances assigned to the same Database Partition for Jobs that are due to execute. The fighting works by using Row Locks. That is why we need transactional properties (hence an RDBS that supports that, not all really do!)
6. Before an Executor Node executes a Job, it will update the Job record in the DB: Flagging it to be “Running”, store the “StartTime” and which Executor node (=itself) is executing the Job, etc.
7. When an Executor Node fails then other Nodes (assigned the same Partition) can detect it using ZooKeeper (due to the Heartbeating). They can then find all the Jobs that the failed Node was executing (Flag=Running and LastExecutor=Failed Node) and can fight for those Jobs to execute them (we could make a Job configurable to retry or not in case of execution failure).
8. Finally after successfully/unsuccessfully executing a Job, we send/publish/produce a Message to another Kafka Queue.

Regarding Point 7 (to expand the horizon on possibilities): We could also use Broadcast Messaging or a Gossip Protocol to detect Node failures. I'm excited to hear your argumentation.

Result Handler Service: Will consume Messages and store the Execution Result in a NoSQL database like Cassandra – Cassandra has a great write-throughput due to the fact that it is Leaderless (no time lost for fail overs for example), replicates asynchronously (can be configured) etc. We are also okay with Eventual Consistency because it is not crucial if we see a result with some delay.

Coordination Service, ZooKeeper: Stores the above mentioned information. Regarding the Database Partitioning information: We can e.g. load that info into ZooKeeper from another Service/Configuration file.

Message Queues, in general: We use Message Queues (here Kafka which is more of an (append) log than a Queue in the conventional sense, you can find great docs on the official website) in order to:

  • Be able to scale the consumer and producer nodes independently (in Kafka we have Topics which can be horizontally partitioned and scale by that)
  • We decouple the consumer and producer from each other
  • Lower latency for the producer (doesn't have to wait for a response)
  • Durability and Reliability: When a Consumer Node crashes another Node can process the Message which otherwise would be lost (see Offset in Kafka). The Messages are persisted.
  • We can throttle/limit the number of messages the consumers process (see Backpressure)
  • Kafka offers message ordering

Other Issues and Concerns

  • Unreliable Clocks/Time: In a distributed system we have unreliable clocks and time (due to unbounded delays when requesting NTP time because we are using packet-switched networks and usually not circuit-switched ones), unreliable NTP servers, Quartz Clocks that develop an offset, etc). When we want to schedule Jobs reliably and execute them at the right time, clocks and time play an important role. We therefore need to make sure that the times on the nodes are synchronized and don't differ too much. One way to achieve that is to use multiple NTP servers and filter out those that deviate much. Another more reliable but costly way is to use Atomic Clocks in the Data Center(s).

Final Words:

  • Of course this is a somewhat “simple” approach that would probably fit into a 45 minutes system design interview
  • I’m aware that there are a lot of difficulties to actually build such a system, e.g.: Distributing Jobs correctly based on the load/work they have to do (and using the CPUs most efficiently), Exactly-once-delivery (/execution) and so on. Feel free to start a discussion :).
  • I didn’t go deeply into things like how ZooKeeper or Apache Kafka works since that would blow the scope of this post. I assume that you know that those systems are already built in a distributed, fault-tolerant way.

Friday, 25 June 2021

Quiz application database design

 I would start with 4 simple tables:

Users

- user_id        auto integer
- regtime        datetime
- username       varchar
- useremail      varchar
- userpass       varchar

Questions

- question_id    auto integer
- question       varchar
- is_active      enum(0,1)

Question_choices

- choice_id        auto integer
- question_id      Questions.question_id
- is_right_choice  enum(0,1)
- choice           varchar

User_question_answers

- user_id        Users.user_id
- question_id    Questions.question_id
- choice_id      Question_choices.choice.id
- is_right       enum(0,1)
- answer_time    datetime

My thought on this table design is:

  • table Users is for storing registered user.
  • table Questions is for storing all your questions.
    • It has is_active so that you can selectively display only active questions (using WHERE is_active = '1')
  • table question_choices is for storing all available options. It has is_right_choice which defines what choice is the right answer for particular question.
  • Table User_question_answers is for storing answer from your user.
    • It has is_right for faster lookup, to see whether that particular question and answer choice is right (based on is_right_choice previously defined).
    • It also has answer_time just to note when that particular user answer the question.

-------------------------------
Some Improvements:

Table Players:

ID int (primary key, identity)
Name nvarchar(100)

Table Questions:

ID int (primary key, identity)
TextOfTheQuestion nvarchar(100)
correctAnswer int (foreign key to Answers.ID)
a int (foreign key to Answers.ID)
b int (foreign key to Answers.ID)
c int (foreign key to Answers.ID)

Table Answers:

ID int (primary key, identity)
TextOfTheAnswer nvarchar(100)

Table PlayerChoices:

PlayerID int (foreign key to Players.ID)
QuestionID int (foreign key to Questions.ID)
AnswerID int (foreign key to Answers.ID) 

Customer Order Database design

 You need four tables, something like this:

Possible Simplified Database Model

Customers

Contains a list of customers. One row per Customer. Would contain all the customers information - their contact details, etc...

Orders

Contains a list of orders. One row per order. Each order is placed by a customer and has a Customer_ID - which can be used to link back to the customer record. Might also store the delivery address, if different from the customers address from their record - or store addresses in separate tables.

OrderItems

Contains a list of order items. One row for each item on an order - so each Order can generate multiple rows in this table. Each item ordered is a product from your inventory, so each row has a product_id, which links to the products table.

Products

Contains a list of products. One row per product. Similar to the customers table, but for products - contains all the product details.

Here's the SQL code that you could use to create this structure - it will create a database for itself called mydb:

CREATE SCHEMA IF NOT EXISTS `mydb` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci ;
USE `mydb` ;

-- -----------------------------------------------------
-- Table `mydb`.`Customer`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`Customer` (
  `ID` INT NOT NULL ,
  `Name` TEXT NOT NULL ,
  `PhoneNo` VARCHAR(45) NULL ,
  PRIMARY KEY (`ID`) )
ENGINE = InnoDB;


-- -----------------------------------------------------
-- Table `mydb`.`Order`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`Order` (
  `ID` INT NOT NULL ,
  `customer_id` INT NULL ,
  PRIMARY KEY (`ID`) ,
  INDEX `fk_Order_1_idx` (`customer_id` ASC) ,
  CONSTRAINT `fk_Order_1`
    FOREIGN KEY (`customer_id` )
    REFERENCES `mydb`.`Customer` (`ID` )
    ON DELETE NO ACTION
    ON UPDATE NO ACTION)
ENGINE = InnoDB;


-- -----------------------------------------------------
-- Table `mydb`.`Product`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`Product` (
  `ID` INT NOT NULL ,
  `Name` VARCHAR(45) NOT NULL ,
  `Description` TEXT NULL ,
  PRIMARY KEY (`ID`) )
ENGINE = InnoDB;


-- -----------------------------------------------------
-- Table `mydb`.`OrderItem`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`OrderItem` (
  `ID` INT NOT NULL ,
  `Order_ID` INT NOT NULL ,
  `Product_ID` INT NOT NULL ,
  `Quantity` INT NOT NULL ,
  PRIMARY KEY (`ID`) ,
  INDEX `fk_OrderItem_1_idx` (`Order_ID` ASC) ,
  INDEX `fk_OrderItem_2_idx` (`Product_ID` ASC) ,
  CONSTRAINT `fk_OrderItem_1`
    FOREIGN KEY (`Order_ID` )
    REFERENCES `mydb`.`Order` (`ID` )
    ON DELETE NO ACTION
    ON UPDATE NO ACTION,
  CONSTRAINT `fk_OrderItem_2`
    FOREIGN KEY (`Product_ID` )
    REFERENCES `mydb`.`Product` (`ID` )
    ON DELETE NO ACTION
    ON UPDATE NO ACTION)
ENGINE = InnoDB;

USE `mydb` ;