Important things

302 http response with Location header for url redirection(GET and Head) - 307 for temporary redirection ,==> Spring Sleuth - tracing in microservices, ==> https://astikanand.github.io/techblogs/high-level-system-design/design-bookmyshow, https://www.hellointerview.com/learn/system-design/in-a-hurry/introduction

Saturday, 26 June 2021

design distributed Job scheduler

 Functional requirements (can vary but I assume the following):

  • A job can be scheduled for one time or multiple executions (cron job) by other services/microservices
  • For each job a class can be specified which inherits some interface like IJob so that we can later call that interface method on the worker nodes when we execute the job. (That class can e.g. be present in a .jar file on the worker nodes).
  • Results of job executions are stored and can be queried

Non-function requirements (again, can vary but I assume the following):

  • Scalability: Thousands or even millions of jobs can be scheduled and run per day
  • Durability: Jobs must not get lost -> we need to persist jobs
  • Reliability: Jobs must not be executed much later than expected or dropped -> we need a fault-tolerant system
  • Availability: It should always be possible to schedule and execute jobs -> (dynamical) horizontal scaling
  • Jobs must not be executed multiple times (or such occurences should be kept to a minimum)

Domain Analysis: Concepts
We can define the Domain Model that can later be converted into a Data Model (Database Model for Schema or a Model for ZooKeeper):

Job:

  • Represents a Job to be executed
  • Properties:
    Id, Name, JobExecutorClass, Priority, Running, LastStartTime, LastEndTime, LastExecutor, Data (Parameters)

Trigger (based on the concept Quartz Scheduler uses):

  • Defines when a Job is executed
  • We can define different Triggers like: OneTimeTrigger, CronTrigger
  • Based on the type, we have properties like:
    Id, Type, StartTime, EndTime, OneTime, Cronjob, Interval

Executor:

  • Is a single Job Executor/Worker Node
  • Can have Properties like:
    Id (e.g. IP-based), LastHeartBeat

The multiplicity between Job and Trigger: Job--1-------*--Trigger (A Job can have multiple Triggers)


HLD (High Level Design)
Now, lets look at a high-level System Design diagram:

image

Microservices that want to schedule a non-/recuring Job: Can send a Message (or produce in Kafka terminology) to the corresponding Kafka Queue (precisely a Topic).

Job Scheduler Service: Will consume the Messages (requesting a Job enqueueing). They will generate a unique Id using e.g. the Snowflake ID Generation concept. Based on that ID (e.g. by hashing it) they decide into which Database Partition the Job will go. They create a Job and Trigger record according to the Message in the corresponding Database Partition.

RDBMS: I chose an RDBMS because we will later need ACID properties, meaning transactions. The database is sharded into an adequate number of shards to distribute the load and data. We use Active-Passive/Master-Slave Replication for each Partition in a semi-synchronous fashion. One Slave/Follower will follow synchronously while the others will receive the Replication Stream asynchronously. That way, we can be sure that at least one Slave holds up to date data in case the Master fails (due to network partitions, server outage, etc.) and that Slave will be promoted to be the new Leader.

Job Executor Service:
1. On Startup it will fetch the Database Partitioning info from ZooKeeper as well as the Partition Assignment between other instances and the Database Partitions.
2. It will choose a Database Partition which has the least number of Executors assigned to balance out the number of Executors that execute Jobs for a all the different Database Partitions.
3. It will send/store the Partition Assignment to ZooKeeper.
4. It constantly sends Heartbeats to ZooKeeper
5. It pulls information from the Database Partition and fights with other Executor instances assigned to the same Database Partition for Jobs that are due to execute. The fighting works by using Row Locks. That is why we need transactional properties (hence an RDBS that supports that, not all really do!)
6. Before an Executor Node executes a Job, it will update the Job record in the DB: Flagging it to be “Running”, store the “StartTime” and which Executor node (=itself) is executing the Job, etc.
7. When an Executor Node fails then other Nodes (assigned the same Partition) can detect it using ZooKeeper (due to the Heartbeating). They can then find all the Jobs that the failed Node was executing (Flag=Running and LastExecutor=Failed Node) and can fight for those Jobs to execute them (we could make a Job configurable to retry or not in case of execution failure).
8. Finally after successfully/unsuccessfully executing a Job, we send/publish/produce a Message to another Kafka Queue.

Regarding Point 7 (to expand the horizon on possibilities): We could also use Broadcast Messaging or a Gossip Protocol to detect Node failures. I'm excited to hear your argumentation.

Result Handler Service: Will consume Messages and store the Execution Result in a NoSQL database like Cassandra – Cassandra has a great write-throughput due to the fact that it is Leaderless (no time lost for fail overs for example), replicates asynchronously (can be configured) etc. We are also okay with Eventual Consistency because it is not crucial if we see a result with some delay.

Coordination Service, ZooKeeper: Stores the above mentioned information. Regarding the Database Partitioning information: We can e.g. load that info into ZooKeeper from another Service/Configuration file.

Message Queues, in general: We use Message Queues (here Kafka which is more of an (append) log than a Queue in the conventional sense, you can find great docs on the official website) in order to:

  • Be able to scale the consumer and producer nodes independently (in Kafka we have Topics which can be horizontally partitioned and scale by that)
  • We decouple the consumer and producer from each other
  • Lower latency for the producer (doesn't have to wait for a response)
  • Durability and Reliability: When a Consumer Node crashes another Node can process the Message which otherwise would be lost (see Offset in Kafka). The Messages are persisted.
  • We can throttle/limit the number of messages the consumers process (see Backpressure)
  • Kafka offers message ordering

Other Issues and Concerns

  • Unreliable Clocks/Time: In a distributed system we have unreliable clocks and time (due to unbounded delays when requesting NTP time because we are using packet-switched networks and usually not circuit-switched ones), unreliable NTP servers, Quartz Clocks that develop an offset, etc). When we want to schedule Jobs reliably and execute them at the right time, clocks and time play an important role. We therefore need to make sure that the times on the nodes are synchronized and don't differ too much. One way to achieve that is to use multiple NTP servers and filter out those that deviate much. Another more reliable but costly way is to use Atomic Clocks in the Data Center(s).

Final Words:

  • Of course this is a somewhat “simple” approach that would probably fit into a 45 minutes system design interview
  • I’m aware that there are a lot of difficulties to actually build such a system, e.g.: Distributing Jobs correctly based on the load/work they have to do (and using the CPUs most efficiently), Exactly-once-delivery (/execution) and so on. Feel free to start a discussion :).
  • I didn’t go deeply into things like how ZooKeeper or Apache Kafka works since that would blow the scope of this post. I assume that you know that those systems are already built in a distributed, fault-tolerant way.

Friday, 25 June 2021

Quiz application database design

 I would start with 4 simple tables:

Users

- user_id        auto integer
- regtime        datetime
- username       varchar
- useremail      varchar
- userpass       varchar

Questions

- question_id    auto integer
- question       varchar
- is_active      enum(0,1)

Question_choices

- choice_id        auto integer
- question_id      Questions.question_id
- is_right_choice  enum(0,1)
- choice           varchar

User_question_answers

- user_id        Users.user_id
- question_id    Questions.question_id
- choice_id      Question_choices.choice.id
- is_right       enum(0,1)
- answer_time    datetime

My thought on this table design is:

  • table Users is for storing registered user.
  • table Questions is for storing all your questions.
    • It has is_active so that you can selectively display only active questions (using WHERE is_active = '1')
  • table question_choices is for storing all available options. It has is_right_choice which defines what choice is the right answer for particular question.
  • Table User_question_answers is for storing answer from your user.
    • It has is_right for faster lookup, to see whether that particular question and answer choice is right (based on is_right_choice previously defined).
    • It also has answer_time just to note when that particular user answer the question.

-------------------------------
Some Improvements:

Table Players:

ID int (primary key, identity)
Name nvarchar(100)

Table Questions:

ID int (primary key, identity)
TextOfTheQuestion nvarchar(100)
correctAnswer int (foreign key to Answers.ID)
a int (foreign key to Answers.ID)
b int (foreign key to Answers.ID)
c int (foreign key to Answers.ID)

Table Answers:

ID int (primary key, identity)
TextOfTheAnswer nvarchar(100)

Table PlayerChoices:

PlayerID int (foreign key to Players.ID)
QuestionID int (foreign key to Questions.ID)
AnswerID int (foreign key to Answers.ID) 

Customer Order Database design

 You need four tables, something like this:

Possible Simplified Database Model

Customers

Contains a list of customers. One row per Customer. Would contain all the customers information - their contact details, etc...

Orders

Contains a list of orders. One row per order. Each order is placed by a customer and has a Customer_ID - which can be used to link back to the customer record. Might also store the delivery address, if different from the customers address from their record - or store addresses in separate tables.

OrderItems

Contains a list of order items. One row for each item on an order - so each Order can generate multiple rows in this table. Each item ordered is a product from your inventory, so each row has a product_id, which links to the products table.

Products

Contains a list of products. One row per product. Similar to the customers table, but for products - contains all the product details.

Here's the SQL code that you could use to create this structure - it will create a database for itself called mydb:

CREATE SCHEMA IF NOT EXISTS `mydb` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci ;
USE `mydb` ;

-- -----------------------------------------------------
-- Table `mydb`.`Customer`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`Customer` (
  `ID` INT NOT NULL ,
  `Name` TEXT NOT NULL ,
  `PhoneNo` VARCHAR(45) NULL ,
  PRIMARY KEY (`ID`) )
ENGINE = InnoDB;


-- -----------------------------------------------------
-- Table `mydb`.`Order`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`Order` (
  `ID` INT NOT NULL ,
  `customer_id` INT NULL ,
  PRIMARY KEY (`ID`) ,
  INDEX `fk_Order_1_idx` (`customer_id` ASC) ,
  CONSTRAINT `fk_Order_1`
    FOREIGN KEY (`customer_id` )
    REFERENCES `mydb`.`Customer` (`ID` )
    ON DELETE NO ACTION
    ON UPDATE NO ACTION)
ENGINE = InnoDB;


-- -----------------------------------------------------
-- Table `mydb`.`Product`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`Product` (
  `ID` INT NOT NULL ,
  `Name` VARCHAR(45) NOT NULL ,
  `Description` TEXT NULL ,
  PRIMARY KEY (`ID`) )
ENGINE = InnoDB;


-- -----------------------------------------------------
-- Table `mydb`.`OrderItem`
-- -----------------------------------------------------
CREATE  TABLE IF NOT EXISTS `mydb`.`OrderItem` (
  `ID` INT NOT NULL ,
  `Order_ID` INT NOT NULL ,
  `Product_ID` INT NOT NULL ,
  `Quantity` INT NOT NULL ,
  PRIMARY KEY (`ID`) ,
  INDEX `fk_OrderItem_1_idx` (`Order_ID` ASC) ,
  INDEX `fk_OrderItem_2_idx` (`Product_ID` ASC) ,
  CONSTRAINT `fk_OrderItem_1`
    FOREIGN KEY (`Order_ID` )
    REFERENCES `mydb`.`Order` (`ID` )
    ON DELETE NO ACTION
    ON UPDATE NO ACTION,
  CONSTRAINT `fk_OrderItem_2`
    FOREIGN KEY (`Product_ID` )
    REFERENCES `mydb`.`Product` (`ID` )
    ON DELETE NO ACTION
    ON UPDATE NO ACTION)
ENGINE = InnoDB;

USE `mydb` ;

Monday, 21 June 2021

Optimistic and Pessimistic locking Database

 Optimistic Locking is a strategy where you read a record, take note of a version number (other methods to do this involve dates, timestamps or checksums/hashes) and check that the version hasn't changed before you write the record back. When you write the record back you filter the update on the version to make sure it's atomic. (i.e. hasn't been updated between when you check the version and write the record to the disk) and update the version in one hit.

If the record is dirty (i.e. different version to yours) you abort the transaction and the user can re-start it.

This strategy is most applicable to high-volume systems and three-tier architectures where you do not necessarily maintain a connection to the database for your session. In this situation the client cannot actually maintain database locks as the connections are taken from a pool and you may not be using the same connection from one access to the next.

Pessimistic Locking is when you lock the record for your exclusive use until you have finished with it. It has much better integrity than optimistic locking but requires you to be careful with your application design to avoid Deadlocks. To use pessimistic locking you need either a direct connection to the database (as would typically be the case in a two tier client server application) or an externally available transaction ID that can be used independently of the connection.

In the latter case you open the transaction with the TxID and then reconnect using that ID. The DBMS maintains the locks and allows you to pick the session back up through the TxID. This is how distributed transactions using two-phase commit protocols (such as XA or COM+ Transactions) work.