- Published on
System Design Alex Xu
- Authors
- Name
- Bowen Y
Chapter One: SCALE FROM ZERO TO MILLIONS OF USERS
Cache Eviction Policy
Eviction Policy: Once the cache is full, any requests to add items to the cache might cause existing items to be removed. This is called cache eviction. Least-recently-used (LRU) is the most popular cache eviction policy. Other eviction policies, such as the Least Frequently Used (LFU) or First in First Out (FIFO), can be adopted to satisfy different use cases.
TODO: Implementation?
Dynamic Content Caching in CDN
TODO
https://aws.amazon.com/cloudfront/dynamic-content/?nc1=h_ls
Stateless architecture VS Stateful architecture
Stateful architecture: The issue is that every request from the same client must be routed to the same server. This can be done with sticky sessions in most load balancers.
TODO: Is there any real-world use case using stateful server.
Stateless architecture: In this stateless architecture, HTTP requests from users can be sent to any web servers, which fetch state data from a shared data store. State data is stored in a shared data store and kept out of web servers.
- Store Session Data in a NoSQL DB
QUESTION: NoSQL is easier to scale than Traditional SQL?
How the database is synchronized in different data center in different physical regions?
https://netflixtechblog.com/active-active-for-multi-regional-resiliency-c47719f6685b
Database Sharding
Each shard shares the same schema, though the actual data on each shard is unique to the shard.
Simple Hash Sharding:
Hash(Partition Key) % NumOfShard -> Different Shard
New Challenges Introduced:
- Resharding data: Consistent hashing
- Celebrity problem: Some shards may be accessed more frequently than others
- Join and de-normalization: A common workaround is to denormalize the database so that queries can be performed in a single table.
Reference:
[1] Hypertext Transfer Protocol: https://en.wikipedia.org/wiki/Hypertext_Transfer_Protocol
[2] Should you go Beyond Relational Databases?: https://blog.teamtreehouse.com/should-you-go-beyond-relational-databases
[3] Replication: https://en.wikipedia.org/wiki/Replication_(computing)
[4] Multi-master replication: https://en.wikipedia.org/wiki/Multi-master_replication
[5] NDB Cluster Replication: Multi-Master and Circular Replication: https://dev.mysql.com/doc/refman/5.7/en/mysql-cluster-replication-multi-master.html
[6] Caching Strategies and How to Choose the Right One: https://codeahoy.com/2017/08/11/caching-strategies-and-how-to-choose-the-right-one/
[7] R. Nishtala, "Facebook, Scaling Memcache at," 10th USENIX Symposium on Networked Systems Design and Implementation (NSDI ’13).
[8] Single point of failure: https://en.wikipedia.org/wiki/Single_point_of_failure
[9] Amazon CloudFront Dynamic Content Delivery: https://aws.amazon.com/cloudfront/dynamic-content/
[10] Configure Sticky Sessions for Your Classic Load Balancer: https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/elb-sticky-sessions.html
[11] Active-Active for Multi-Regional Resiliency: https://netflixtechblog.com/active-active-for-multi-regional-resiliency-c47719f6685b
[12] Amazon EC2 High Memory Instances: https://aws.amazon.com/ec2/instance-types/high-memory/
[13] What it takes to run Stack Overflow: http://nickcraver.com/blog/2013/11/22/what-it-takes-to-run-stack-overflow
[14] What The Heck Are You Actually Using NoSQL For: http://highscalability.com/blog/2010/12/6/what-the-heck-are-you-actually-using-nosqlfor.html
Chapter Two: BACK-OF-THE-ENVELOPE ESTIMATION
Latency
Reference: https://colin-scott.github.io/personal_website/research/interactive_latency.html
Service Level Agreement(SLA)
Make Clear Assumptions during Interviews
- QPS
- peak QPS
- storage
- cache
- number of servers
CHAPTER 3: A FRAMEWORK FOR SYSTEM DESIGN INTERVIEWS
What is expected in a system design interview?
- Technical skills
- The ability to cooperate
- The ability to ask good questions
- Real world trade-offs
A 4-step process for effective system design interview
Step 1 - Understand the problem and establish design scope
- Slow down. Think deeply and ask questions to clarify requirements and assumptions.
- What Features? -> What microservice we need + Database Schema + Data Limitation + Data Flow + API endpoints(Spec)
- What Size? (How many users and what frequency) -> Algorithm Complexity + TPS + Storage + Latency
- Level of Consistency? -> Weak or Strong
- Current technology stack? -> If there is something you can leverage to simplify the design
- Write down your assumptions on the whiteboard
Step 2 - Propose high-level design and get buy-in
- Come up with an initial blueprint for the design. Ask for feedback. Treat your interviewer as a teammate and work together. Many good interviewers love to talk and get involved.
- Draw box diagrams with key components on the whiteboard or paper. This might include clients (mobile/web), APIs, web servers, data stores, cache, CDN, message queue, etc.
- Do back-of-the-envelope calculations to evaluate if your blueprint fits the scale constraints. Think out loud. Communicate with your interviewer if back-of-the-envelope is necessary before diving into it.
- Go through a few concrete use cases to discover edge cases
Step 3 - Design deep dive
- You shall work with the interviewer to identify and prioritize components in the architecture. Different interviewers may have different preferences on different systems.
- For URL shortener, it is interesting to dive into the hash function design that converts a long URL to a short one.
- For a chat system, how to reduce latency and how to support online/offline status are two interesting topics.
- For an e-commerce platform, it is interesting to dive into inventory management and the order processing workflow.
- For a social media platform, how to handle real-time updates for the feed and notifications, and how to implement data partitioning and sharding are two interesting topics.
- For a video streaming service, it is interesting to dive into streaming protocols and techniques for reducing latency.
- For a ride-sharing service, how to handle real-time ride tracking and how to implement the matching algorithm and surge pricing are two interesting topics.
- For an online marketplace, it is interesting to dive into search and recommendation algorithms and fraud detection and prevention.
- For a cloud storage service, how to ensure data redundancy and replication and how to manage efficient file storage and retrieval are two interesting topics.
- For a content management system (CMS), it is interesting to dive into user roles and permissions and performance optimization for content delivery.
- Try not to get into unnecessary details.
Step 4 - Wrap up
In this final step, the interviewer might ask you a few follow-up questions or give you the freedom to discuss other additional points.
- The interviewer might want you to identify the system bottlenecks and discuss potential improvements.
- It could be useful to give the interviewer a recap of your design.
- Error cases (server failure, network loss, etc.) are interesting to talk about.
- Operation issues are worth mentioning. How do you monitor metrics and error logs? How to roll out the system?
- How to handle the next scale curve is also an interesting topic.
Dos
- Always ask for clarification. Do not assume your assumption is correct.
- Understand the requirements of the problem.
- There is neither the right answer nor the best answer. A solution designed to solve the problems of a young startup is different from that of an established company with millions of users. Make sure you understand the requirements.
- Let the interviewer know what you are thinking. Communicate with your interview.
- Suggest multiple approaches if possible.
- Once you agree with your interviewer on the blueprint, go into details on each component. Design the most critical components first.
- Bounce ideas off the interviewer. A good interviewer works with you as a teammate.
- Never give up.
Don'ts
- Don't be unprepared for typical interview questions.
- Don’t jump into a solution without clarifying the requirements and assumptions.
- Don’t go into too much detail on a single component in the beginning. Give the high-level design first then drills down.
- If you get stuck, don't hesitate to ask for hints.
- Again, communicate. Don't think in silence.
- Don’t think your interview is done once you give the design. You are not done until your interviewer says you are done. Ask for feedback early and often.
CHAPTER 4: DESIGN A RATE LIMITER
The benefits of using an API rate limiter:
- Prevent resource starvation caused by Denial of Service (DoS) attack.
- Reduce cost. Limiting excess requests means fewer servers and allocating more resources to high priority APIs.
- Prevent servers from being overloaded.
Step 1 - Understand the problem and establish design scope
- What kind of rate limiter are we going to design? Is it a client-side rate limiter (written in JS and can be bypassed by users who manipulate the code) or server-side API rate limiter?
- Does the rate limiter throttle API requests based on IP, the user ID, or other properties?
- What is the scale of the system? Is it built for a startup or a big company with a large user base?
- Will the system work in a distributed environment?
- Is the rate limiter a separate service or should it be implemented in application code?
- Do we need to inform users who are throttled?
Summary of the requirements:
- Accurately limit excessive requests.
- Low latency. The rate limiter should not slow down HTTP response time.
- Use as little memory as possible.
- Distributed rate limiting. The rate limiter can be shared across multiple servers or processes.
- Exception handling. Show clear exceptions to users when their requests are throttled.
- High fault tolerance. If there are any problems with the rate limiter (for example, a cache server goes offline), it does not affect the entire system.
Step 2 - Propose high-level design and get buy-in
- Where to implement the rate limiter:
- Client Side(not reliable)
- Server Side(Inside API Servers)
- Microservice in the middle: rate limiting is usually implemented within a component called API gateway
So where should the rater limiter be implemented, on the server-side or in a gateway? There is no absolute answer.
- What technology stacks to use:
- Cache service: Redis/Memcache
- Rate Limiting Rules: QPS by account, IP
- Algorithm for rate limiting:
- Token bucket
- Bucket size: the maximum number of tokens allowed in the bucket
- Refill rate: number of tokens put into the bucket every second
- How many buckets do we need?
- Improvement: Lazy Refilling(Refill when receiving new requests) + Automatic Cleanup
- Better in scenarios requiring instant responses.
- AWS API Gateway: https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-request-throttling.html
- Stripe: https://stripe.com/blog/rate-limiters
- Leaking bucket
- Bucket size: it is equal to the queue size. The queue holds the requests to be processed at a fixed rate.
- Outflow rate: it defines how many requests can be processed at a fixed rate, usually in seconds.
- But the requests at the end of the queue will be delayed.
- Better in scenarios requiring predictable processing rate.
- Shopify: https://shopify.dev/docs/api/usage/rate-limits
- Fixed window counter
- A counter used in each fixed time window(per minute, pet hour...)
- Pros: simplicity and ease of implementation
- Cons: uneven request distribution near the boundary of time windows.
- Widely used in a SaaS platform subscription service.(10000 API calls per month)
- Better for time-sensitive scenarios like subscription, game, voting systems, etc.
- Sliding window log
- Rate limiting implemented by this algorithm is very accurate.
- Require lots of memory and computing for storing and comparing the timestamps of each request
- So at the cost of more memory and processing power, this algorithm is used for Financial Systems, Authentication Systems, which are not high-frequency but requires accuracy.
- Sliding window counter
- Combination of fixed window counter and sliding window log:
Requests in current window + requests in the previous window * overlap percentage of the rolling window and previous window
- Memory efficient and better burst resolving than fixed window counter when facing the edge of the time window issues
- An approximation of the actual rate because it assumes requests in the previous window are evenly distributed
- Cloudfare: 0.003% of requests are wrongly allowed or rate limited among 400 million requests
- Combination of fixed window counter and sliding window log:
- Token bucket
- High-level design graph
Step 3 - Design deep dive
The high-level design(Rate limiting as the middleware using Redis as the counter storage) does not answer the following questions:
How are rate limiting rules created? Where are the rules stored?
- Rules are generally written in configuration files and saved on disk.
domain: auth descriptors: - key: auth_type Value: login rate_limit: unit: minute requests_per_unit: 5
- Afterwards, should the rules be stored in memory or in redis?
- Loading Rules into Memory:
- Environment Variables or In-Memory Storage: When your application starts, you can load the rate limit rules from the YAML file into memory (such as environment variables or a configuration object).
- Advantage: Fast access, minimal latency, and no dependency on external services during rule evaluation.
- Disadvantage: Requires application restart to update rules.
- Caching Rules in Redis:
- Redis Cache: You can load the rate limit rules into Redis when your application starts. The application can then fetch these rules from Redis when needed.
- Advantage: Centralized storage for rules, making it easier to update rules without restarting the application. This is especially useful in distributed systems where multiple instances of your application need to share the same rules.
- Disadvantage: Slightly increased latency compared to in-memory access, but still very fast due to Redis's performance.
- Loading Rules into Memory:
- Rules are generally written in configuration files and saved on disk.
How to handle requests that are rate limited?
- In case a request is rate limited, APIs return a HTTP response code 429 (too many requests) to the client.
- Sometimes we need to keep those requests to be processed later.
Rate limiter HTTP headers
- X-Ratelimit-Remaining: The remaining number of allowed requests within the window.
- X-Ratelimit-Limit: It indicates how many calls the client can make per time window.
- X-Ratelimit-Retry-After: The number of seconds to wait until you can make a request again without being throttled.
Detailed design:
Rate limiter in a distributed environment
- Race condition
- Locks are the most obvious solution for solving race condition. However, locks will significantly slow down the system.
- Two strategies are commonly used to solve the problem:
- Lua script
- Sorted sets data structure in Redis
- Synchronization issue
- Use centralized data stores like Redis
- Race condition
Step 4 - Wrap up
- Hard vs soft rate limiting
- Hard: The number of requests cannot exceed the threshold.
- Soft: Requests can exceed the threshold for a short period.
- Rate limiting at different levels:
- at the application level (HTTP: layer 7).
- by IP addresses using Iptables (IP: layer 3).
Expansion
How does Redis solve the consistency issue in distributed mode?
Redis Consistency Model
- Eventual Consistency: Redis, by default, does not provide strong consistency. Instead, it operates under an eventual consistency model. This means that after a write operation, there might be a delay before all replicas have the same data as the master. During this period, read operations from replicas might return outdated data.
Master-Slave (Primary-Replica) Replication:
- Mechanism: In Redis, a master (primary) node handles all write operations. One or more slave (replica) nodes replicate data from the master. Replication is typically asynchronous, meaning that there is a delay before the replicas catch up with the master.
- Consistency: Because replication is asynchronous, it is possible for replicas to lag behind the master, leading to a window where reads from replicas might return stale data.
Redis Cluster:
- Sharding: Redis Cluster allows Redis to scale horizontally by partitioning (sharding) the dataset across multiple master nodes. Each master in the cluster is responsible for a subset of the keyspace, determined by consistent hashing.
- Groups: Each shard in a Redis Cluster has one master and several replicas (slaves). This master-replica setup within each shard provides redundancy and improves fault tolerance.
- Key Distribution: When a key is stored or accessed, the Redis Cluster calculates which shard (master-replica group) is responsible for that key using consistent hashing. The key is then sent to the appropriate master node within the cluster.
Redis Sentinel:
- Monitoring and Failover: Redis Sentinel is responsible for monitoring the health of the master nodes. If a master node fails, Sentinel coordinates the election of a new master from one of the replicas. This process involves promoting the most up-to-date replica to the master role.
- High Availability: Sentinel helps to ensure high availability by automatically handling failovers, making the Redis deployment more resilient to failures.
CHAPTER 5: DESIGN CONSISTENT HASHING
Consistent hashing is a special kind of hashing such that when a hash table is re-sized and consistent hashing is used, only k/n keys need to be remapped on average, where k is the number of keys, and n is the number of slots. In contrast, in most traditional hash tables, a change in the number of array slots causes nearly all keys to be remapped
- Traditional Hashing: hash(node_id) % node_number
- Consistent Hashing: hash(node_id)
- Virtual Node in Consistent Hashing: hash(node_id + str(vnode_id))
- Small Clusters (3-10 Nodes): Consider using 100-200 vnodes per node.
- Medium Clusters (10-50 Nodes): 50-150 vnodes per node are usually sufficient
- Large Clusters (50+ Nodes): 20-100 vnodes per node might be appropriate.
- The standard deviation that measures how data are spread out is between 5% (200 virtual nodes) and 10% (100 virtual nodes) of the mean.
- However, more spaces are needed to store data about virtual nodes. This is a tradeoff, and we can tune the number of virtual nodes to fit our system requirements.
- Find affected range
Add a new node onto the ring. The affected range starts from the newly added node(
current
) and moves anticlockwise around the ring until a server is found (previous
). Thus, keys located betweencurrent
andprevious
(which originaly belong tonext
, the nodenext
tocurrent
) need to be redistributed tocurrent
.Remove a node from the ring. The affected range starts from the newly added node(
current
) and moves anticlockwise around the ring until a server is found (previous
). Thus, keys located betweencurrent
andprevious
(which originaly belong tocurrent
) need to be redistributed tonext
.
- Benefits of consistent hashing
Minimized keys are redistributed when servers are added or removed.
It is easy to scale horizontally because data are more evenly distributed.
Mitigate hotspot key problem. Excessive access to a specific shard could cause server overload. Consistent hashing helps to mitigate the problem by distributing the data more evenly.
CHAPTER 6: DESIGN A KEY-VALUE STORE
1. Requirements
- The size of a key-value pair is small: less than 10 KB.
- Ability to store big data.
- High availability: The system responds quickly, even during failures.
- High scalability: The system can be scaled to support large data set.
- Automatic scaling: The addition/deletion of servers should be automatic based on traffic.
- Tunable consistency.
- Low latency.
2. What features are expected in CAP(AP or CP)?
Consistency: consistency means all clients see the same data at the same time no matter which node they connect to.
Availability: availability means any client which requests data gets a response even if some of the nodes are down.
Partition Tolerance: a partition indicates a communication break between two nodes. Partition tolerance means the system continues to operate despite network partitions.
NOTE: Since network failure is unavoidable, a distributed system must tolerate network partition. Thus, a CA system cannot exist in real-world applications.
CP(Strong Consistency): If we choose consistency over availability (CP system), we must block all write operations to n1 and n2 to avoid data inconsistency among these three servers when n3 is offline, which makes the system unavailable.
Examples:
- Paxos
- Raft
- Zookeeper
- Etcd
AP(Eventual Consistency): If we choose availability over consistency (AP system), the system keeps accepting reads, even though it might return stale data. For writes, n1 and n2 will keep accepting writes, and data will be synced to n3 when the network partition is resolved.
Examples:
- Cassandra
- DNS (Domain Name System)
3. System components
Core components and techniques used to build a key-value store:
Component 1: Data partition
- Challenges in partitioning the data:
- Distribute data across multiple servers evenly.
- Minimize data movement when nodes are added or removed.
- Solution: Consistent hashing
- Automatic scaling: servers could be added and removed automatically depending on the load.
- Heterogeneity: the number of virtual nodes for a server is proportional to the server capacity.
Component 2: Data replication
- To achieve high availability and reliability, data must be replicated asynchronously over N servers, where N is a configurable parameter.
- These N servers are chosen using the following logic: after a key is mapped to a position on the hash ring, walk clockwise from that position and choose the first N servers on the ring to store data copies.
- With virtual nodes, the first N nodes on the ring may be owned by fewer than N physical servers. To avoid this issue, we only choose unique servers while performing the clockwise walk logic.
Component 3: Consistency
- N = The number of replicas
- W = A write quorum of size W. When a client performs a write operation, it is sent to at least W nodes. The operation is considered successful only when at least W nodes confirm the write.
- R = A read quorum of size R. When a client reads data, it queries at least R nodes. The read operation is successful only if at least R nodes return the same version of the data.
Question: How to configure N, W, and R to fit our use cases? Here are some of the possible setups:
- If R = 1 and W = N, the system is optimized for a fast read.
- Usage Example: Financial Transaction Processing
- The system needs to ensure that the write operation is strongly consistent across multiple nodes to avoid issues like double spending or incorrect balances.
- TODO: Future Investigation
- If W = 1 and R = N, the system is optimized for fast write.
- Usage Example: Content Delivery Network (CDN) for Static Content
- The system can afford to have a lower W since updates are less frequent, but R needs to be high to ensure users get the most up-to-date content.
- TODO: Future Investigation
- If W = N/2 + 1 and R = N/2 + 1, see Paxos and Raft.
- If W + R > N, strong consistency is guaranteed.
- If W + R <= N, strong consistency is not guaranteed.
Consistency models
- Strong consistency: any read operation returns a value corresponding to the result of the most updated write data item. A client never sees out-of-date data.
- Weak consistency: subsequent read operations may not see the most updated value.
- Eventual consistency: this is a specific form of weak consistency. Given enough time, all updates are propagated, and all replicas are consistent.
Component 4: Inconsistency resolution: Versioning
- vector clock: a [server, version] pair associated with a data item
- Using vector clocks, it is easy to tell that a version X is an ancestor (i.e. no conflict) of version Y if the version counters for each participant in the vector clock of Y is greater than or equal to the ones in version X. For example, the vector clock D([s0, 1], [s1, 1])] is an ancestor of D([s0, 1], [s1, 2]). Therefore, no conflict is recorded.
- You can tell that a version X is a sibling (i.e., a conflict exists) of Y if there is any participant in Y's vector clock who has a counter that is less than its corresponding counter in X. For example, the following two vector clocks indicate there is a conflict: D([s0, 1], [s1, 2]) and D([s0, 2], [s1, 1]).
- Downsides:
- First, vector clocks add complexity to the client because it needs to implement conflict resolution logic.
- Second, the [server: version] pairs in the vector clock could grow rapidly. However, based on Dynamo paper, Amazon has not yet encountered this problem in production;
Component 5: Failures
Failure detection
all-to-all multicasting: Inefficient
decentralized failure detection methods: Gossip Protocol
Each node keeps a list of other nodes with a heartbeat counter, which it increases periodically to show it's active.
Nodes randomly share their heartbeat info with a few other nodes, which then pass it on, spreading the information throughout the network.
When a node notices that another node's heartbeat hasn't increased for a while, it assumes that node is offline.
Question: How is the node list of each node determined?
Answer: In the gossip protocol, the list of nodes that each node communicates with is typically determined randomly, not manually.
Each node in the system maintains a membership list of all other nodes, and when it needs to send a heartbeat or other information, it randomly selects a small subset of nodes from this list to communicate with. This selection process is dynamic and happens automatically at each communication interval.
Question: Is there any chance that a healthy node is treated as unhealthy by mistake?
Yes, there is a possibility, albeit a small one, that a network partition or random selection could lead to a situation where a healthy node is incorrectly marked as unhealthy in the gossip protocol.
Handling temporary failures: Sloppy Quorum and Hinted Handoff(The Quorum number is not changed, still W and R)
When some nodes are unreachable due to failures or network issues, sloppy quorum steps in to ensure operations can still proceed:
Write Operations: If the required nodes for a write quorum are unavailable, the system temporarily writes the data to any available nodes, even if they are not part of the original set responsible for the data. These nodes are often referred to as “hinted” nodes.
Read Operations: Similarly, read operations can be satisfied by querying any subset of nodes that hold the relevant data, ensuring that the operation completes even if not all original nodes are reachable.
The data is stored on available nodes, and a “hint” is recorded. This hint indicates that the data should be transferred to the original responsible nodes once they become available again.
These hinted writes ensure that the data is not lost and will eventually reach its intended location.
Higher Availability: By allowing operations to proceed with any available nodes, sloppy quorum significantly improves the system’s availability during partial failures.
Temporary Inconsistency: The system may temporarily have inconsistencies since not all nodes are updated immediately. However, it ensures that these inconsistencies are resolved over time, maintaining eventual consistency.
Potential for Data Loss: If hinted nodes fail before the data can be transferred to the original nodes, there is a risk of data loss unless additional measures (e.g., replication of hints) are taken.
Reference: https://www.geeksforgeeks.org/what-is-sloppy-quorum-and-hinted-handoff/
Handling permanent failures: Anti-Entropy Protocols
Merkle tree: Inconsistency Detection
- Tree-Like Structure: A Merkle tree is a binary tree, where each leaf node represents a hash of a block of data, and each non-leaf (or internal) node is a hash of its child nodes.
- Leaves
(Hash(data))
: The leaf nodes contain the hash values of the actual data blocks. - Non-Leaf Nodes
(Hash(Hash(Child1) + Hash(Child2)))
: The internal nodes are created by taking the hash of the concatenation of their child nodes' hash values. This process continues up the tree until you reach the root node. - It's widely used in systems where data integrity is critical, to ensure that data hasn't been tampered with or to identify which part has been modified.
TODO: So what's the solution to handle permanent failures? (NOT MENTIONED IN THE BOOK)
- Detection of Failure
- Node Replacement
- Data Rebalancing
- Data Replication
- Consensus Protocols: In some cases, consensus protocols like Raft or Paxos may be used to coordinate the recovery process, ensuring that the system agrees on the state of the data after recovery.
Component 6: System architecture diagram
Clients communicate with the key-value store through simple APIs: get(key) and put(key, value).
A coordinator is a node that acts as a proxy between the client and the key-value store.
Nodes are distributed on a ring using consistent hashing.
The system is completely decentralized so adding and moving nodes can be automatic.
Data is replicated at multiple nodes.
There is no single point of failure as every node has the same set of responsibilities.
Component 7: Write path
The write request is persisted on a commit log file.
Data is saved in the memory cache.
When the memory cache is full or reaches a predefined threshold, data is flushed to SSTable on disk.
TODO: What is a SSTable?
Component 8: Read path
The system first checks if data is in memory. If not, go to step 2.
If data is not in memory, the system checks the bloom filter. 3. The bloom filter is used to figure out which SSTables might contain the key.
SSTables return the result of the data set.
The result of the data set is returned to the client.
Reference:
- Cassandra Architecture: https://cassandra.apache.org/doc/stable/cassandra/architecture/
- SStable: https://www.igvita.com/2012/02/06/sstable-and-log-structured-storage-leveldb/
CHAPTER 7: DESIGN A UNIQUE ID GENERATOR IN DISTRIBUTED SYSTEMS
Requirements:
- IDs must be unique.
- IDs are numerical values only.
- IDs fit into 64-bit.
- IDs are ordered by date.
- Ability to generate over 10,000 unique IDs per second.
Possible Solutions:
Multi-master replication Instead of increasing the next ID by 1, we increase it by k, where k is the number of database servers in use.
UUID No coordination required but is 128 bits long and non-numeric.
Ticket Server Use a centralized auto increment feature in a single database/redis server (Ticket Server)
Twitter snowflake approach
Divide an ID into different sections:
Sign bit: 1 bit. It will always be 0. This is reserved for future uses. It can potentially be used to distinguish between signed and unsigned numbers.
Timestamp: 41 bits. Milliseconds since the epoch or custom epoch.
Datacenter ID: 5 bits, which gives us 2 ^ 5 = 32 datacenters.
Machine ID: 5 bits, which gives us 2 ^ 5 = 32 machines per datacenter.
Sequence number: 12 bits. For every ID generated on that machine/process, the sequence number is incremented by 1. The number is reset to 0 every millisecond.
Custom Algorithm with Distributed Coordination Systems(Zookeeper, etcd, etc)
More Discussion:
- Clock synchronization Network Time Protocol is the most popular solution to this problem
- TODO: All the solution listed in the books are silly and not that useful, if you have more time think of something can be used in the real world.
CHAPTER 8: DESIGN A URL SHORTENER
301 redirect. A 301 redirect shows that the requested URL is “permanently” moved to the long URL. Since it is permanently redirected, the browser caches the response, and subsequent requests for the same URL will not be sent to the URL shortening service. Instead, requests are redirected to the long URL server directly.
- Reduce the server load
302 redirect. A 302 redirect means that the URL is “temporarily” moved to the long URL, meaning that subsequent requests for the same URL will be sent to the URL shortening service first. Then, they are redirected to the long URL server.
- Good for analytics
Requirements:
- Write operation: 100 million URLs are generated per day.
- Write operation per second: 100 million / 24 /3600 = 1160
- Read operation: Assuming ratio of read operation to write operation is 10:1, read operation per second: 1160 * 10 = 11,600
- Assuming the URL shortener service will run for 10 years, this means we must support 100 million * 365 * 10 = 365 billion records.
- Assume average URL length is 100.
- Storage requirement over 10 years: 365 billion * 100 bytes * 10 years = 365 TB
Hash function:
The hashValue consists of characters from [0-9, a-z, A-Z], containing 10 + 26 + 26 = 62 possible characters. To figure out the length of hashValue, find the smallest n such that 62^n ≥ 365 billion.
Hash Function 1: hash + collision resolution
- A straightforward solution is to use well-known hash functions like CRC32, MD5, or SHA-1.
- Even the shortest hash value (from CRC32) is too long (more than 7 characters). How can we make it shorter?
- The first approach is to collect the first 7 characters of a hash value; however, this method can lead to hash collisions.
- To resolve hash collisions, we can recursively append a new predefined string until no more collision is discovered
- Can use bloom filter + Redis to accelarate DB searching
Hash Function 2: base 62 conversion
- Use a unique incremental ID generator
- Convert the decimal int to base 62
- People can guess the URL path, so it has potential security issues
CHAPTER 9: DESIGN A WEB CRAWLER
A crawler is used for many purposes:
Search engine indexing: This is the most common use case. A crawler collects web pages to create a local index for search engines. For example, Googlebot is the web crawler behind the Google search engine.
Web archiving: This is the process of collecting information from the web to preserve data for future uses. For instance, many national libraries run crawlers to archive web sites. Notable examples are the US Library of Congress and the EU web archive.
Web mining: The explosive growth of the web presents an unprecedented opportunity for data mining. Web mining helps to discover useful knowledge from the internet. For example, top financial firms use crawlers to download shareholder meetings and annual reports to learn key company initiatives.
Web monitoring: The crawlers help to monitor copyright and trademark infringements over the Internet. For example, Digimarc utilizes crawlers to discover pirated works and reports.
The basic algorithm of a web crawler is simple
Given a set of URLs, download all the web pages addressed by the URLs.
Extract URLs from these web pages
Add new URLs to the list of URLs to be downloaded. Repeat these 3 steps.
Ask Questions:
What is the main purpose of the crawler? Is it used for search engine indexing, data mining, or something else?
Requirement: Search engine indexing.
How many web pages does the web crawler collect per month?
Requirement: 1 billion pages.
What content types are included? HTML only or other content types such as PDFs and images as well?
Requirement: HTML only.
Shall we consider newly added or edited web pages?
Requirement: Yes, we should consider the newly added or edited web pages.
Do we need to store HTML pages crawled from the web?
Requirement: Yes, up to 5 years
How do we handle web pages with duplicate content?
Requirement: Pages with duplicate content should be ignored.
Characteristics of the system:
- Scalability: Web crawling should be extremely efficient using parallelization.
- Robustness: Bad HTML, unresponsive servers, crashes, malicious links. The crawler must handle all those edge cases.
- Politeness: The crawler should not make too many requests to a website within a short time interval.
- Extensibility: The system is flexible so that minimal changes are needed to support new content types. For example, if we want to crawl image files in the future, we should not need to redesign the entire system.
Scale Estimation
- QPS: 1,000,000,000 / 30 days / 24 hours / 3600 seconds = ~400 pages per second.
- Assume the average web page size is 2000k.
- 1-billion-page x 2000k = 2 PB storage per month.
- Assuming data are stored for five years, 2 PB * 12 months * 5 years = 120 PB. A 120 PB storage is needed to store five-year content.
Initial Design
Seed URLs
A web crawler uses seed URLs as a starting point for the crawl process.
To crawl the entire web, we need to be creative in selecting seed URLs. A good seed URL serves as a good starting point that a crawler can utilize to traverse as many links as possible.
The general strategy is to divide the entire URL space into smaller ones. The first proposed approach is based on locality as different countries may have different popular websites. Another way is to choose seed URLs based on topics.
URL Frontier
Most modern web crawlers split the crawl state into two: to be downloaded and already downloaded. The component that stores URLs to be downloaded is called the URL Frontier. You can refer to this as a First-in-First-out (FIFO) queue.
HTML Downloader
The HTML downloader downloads web pages from the internet. Those URLs are provided by the URL Frontier.
DNS Resolver(But we don't need to design this right)
To download a web page, a URL must be translated into an IP address. The HTML Downloader calls the DNS Resolver to get the corresponding IP address for the URL.
Content Parser
After a web page is downloaded, it must be parsed and validated because malformed web pages could provoke problems and waste storage space. Implementing a content parser in a crawl server will slow down the crawling process. Thus, the content parser is a separate component.
Content Duplicate Checker
An efficient way to accomplish this task is to compare the hash values of the two web pages.
Content Storage
It is a storage system for storing HTML content.
Most of the content is stored on disk because the data set is too big to fit in memory.
Popular content is kept in memory to reduce latency.
URL Extractor
URL Extractor parses and extracts links from HTML pages. Figure below shows an example of a link extraction process. Relative paths are converted to absolute URLs by adding a prefix.
URL Filter
The URL filter excludes certain content types, file extensions, error links and URLs in “blacklisted” sites.
URL Duplicate Checker
It helps to avoid adding the same URL multiple times as this can increase server load and cause potential infinite loops.
Bloom filter and hash table are common techniques to implement the Deplicate Checker component.
URL Storage
URL Storage stores already visited URLs.
Web crawler workflow
Design deep dive
DFS vs BFS
You can think of the web as a directed graph where web pages serve as nodes and hyperlinks (URLs) as edges. The crawl process can be seen as traversing a directed graph from one web page to others. Two common graph traversal algorithms are DFS and BFS. However, DFS is usually not a good choice because the depth of DFS can be very deep.
BFS is commonly used by web crawlers and is implemented by a first-in-first-out (FIFO) queue. In a FIFO queue, URLs are dequeued in the order they are enqueued. However, this implementation has two problems:
Impoliteness: Most links from the same web page are linked back to the same host. When the crawler tries to download web pages in parallel, servers will be flooded with requests. This is considered as “impolite”.
Standard BFS does not take the priority of a URL into consideration. The web is large and not every page has the same level of quality and importance. Therefore, we may want to prioritize URLs according to their page ranks, web traffic, update frequency, etc.
URL frontier
URL frontier helps to address these problems. A URL frontier is a data structure that stores URLs to be downloaded. The URL frontier is an important component to ensure politeness, URL prioritization, and freshness.
Politeness
The general idea of enforcing politeness is to download one page at a time from the same host. A delay can be added between two download tasks. The politeness constraint is implemented by maintain a mapping from website hostnames to download (worker) threads.
In the book, author introduces a method which requires a
separate queue containing URLs from a single host
for each hostname. It also requiresOne Worker per Queue
. But I think this method is inefficient with resources.Possible Improvements:
- Worker Pool
- Custom DelayedQueue
Priority
Prioritize URLs based on usefulness, which can be measured by PageRank, website traffic, update frequency, etc. “Prioritizer” is the component that handles URL prioritization.
Prioritizer: It takes URLs as input and computes the priorities.
Queue f1 to fn: Each queue has an assigned priority. Queues with high priority are selected with higher probability.
Queue selector: Randomly choose a queue with a bias towards queues with higher priority.
Freshness
- Recrawl based on web pages’ update history.
- Prioritize URLs and recrawl important pages first and more frequently.
Storage for URL Frontier
- The majority of URLs are stored on disk
- Maintain buffers in memory for enqueue/dequeue operations. Data in the buffer is periodically written to the disk.
Robots Exclusion Protocol The
Robots Exclusion Protocol (REP)
, commonly referred to as therobots.txt
file, is a standard used by websites to communicate with web crawlers and other automated agents about which parts of the site should or should not be accessed. It is a way for site owners to control how search engines and other web crawlers interact with their site.- Disallow Directive: The
Disallow
directive specifies which parts of the site crawlers should not access. - Allow Directive: The
Allow
directive specifies parts of the site that crawlers are allowed to access. - Sitemap Directive: The
robots.txt
file can also include a link to the site's XML sitemap, which helps crawlers understand the structure of the site and locate the most important pages for indexing. - No Enforcement Mechanism: It’s important to note that the Robots Exclusion Protocol is purely advisory. Compliant crawlers (such as search engines like Google or Bing) follow the rules, but non-compliant crawlers, such as some malicious bots, can ignore the
robots.txt
file and crawl the site anyway. - No Indexing Control: The
robots.txt
file only controls crawling, not indexing. Pages that are disallowed in therobots.txt
file may still be indexed if they are linked to from other sites.
User-agent: * Disallow: /private/ Allow: /public/ Sitemap: https://www.example.com/sitemap.xml
- Disallow Directive: The
Performance optimization
Distributed crawl
Cache DNS Resolver
DNS Resolver is a bottleneck for crawlers because DNS requests might take time due to the synchronous nature of many DNS interfaces. DNS response time ranges from 10ms to 200ms. Once a request to DNS is carried out by a crawler thread, other threads are blocked until the first request is completed. Our DNS cache keeps the domain name to IP address mapping and is updated periodically by cron jobs.
Why Are DNS Requests Synchronous?
Historically, many DNS interfaces were implemented synchronously for simplicity. Early DNS libraries and network protocols often operated in a blocking, synchronous fashion because that was how many networking systems were designed. Some reasons include:
- Legacy Design: Synchronous operations are easier to implement, especially in systems where concurrency and non-blocking I/O were less emphasized.
- Blocking I/O: DNS resolution was typically done through blocking I/O calls, meaning the program would wait until the DNS response came back before moving forward.
- Single Request-Response Nature: DNS queries are usually simple request-response pairs, so they were often implemented as blocking calls, with the assumption that the response would be relatively fast.
Modern Asynchronous DNS Services
- Java: Uses the
InetAddress
class for synchronous DNS and libraries like DNSJava for more advanced queries. Asynchronous options includeNetty DNS Resolver
andVert.x
. - Python: Supports DNS resolution through the socket module for synchronous calls and libraries like
dnspython
for advanced queries. Asynchronous options includeaiohttp
andaiodns
. - Go: Has the
net
package for synchronous DNS resolution and advanced libraries likemiekg/dns
. Asynchronous DNS can be handled using goroutines.
Locality
Short timeout
Robustness
Besides performance optimization, robustness is also an important consideration. We present a few approaches to improve the system robustness:
Consistent hashing: This helps to distribute loads among downloaders.
Save crawl states and data: To guard against failures, crawl states and data are written to a storage system. A disrupted crawl can be restarted easily by loading saved states and data.
Exception handling: Errors are inevitable and common in a large-scale system. The crawler must handle exceptions gracefully without crashing the system.
Data validation: This is an important measure to prevent system errors.
Extensibility
- PNG Downloader module: it is plugged-in to download PNG files.
- Web Monitor module: prevent copyright and trademark infringements.
Detect and avoid problematic content
- Redundant content
- Spider traps
A spider trap is a web page that causes a crawler in an infinite loop. For instance, an infinite deep directory structure is listed as follows: www.spidertrapexample.com/foo/bar/foo/bar/foo/bar/… Such spider traps can be avoided by setting a maximal length for URLs. However, no onesize-fits-all solution exists to detect spider traps. Websites containing spider traps are easy to identify due to an unusually large number of web pages discovered on such websites. It is hard to develop automatic algorithms to avoid spider traps; however, a user can manually verify and identify a spider trap, and either exclude those websites from the crawler or apply some customized URL filters.
Expansion
- Server-side rendering: Numerous websites use scripts like JavaScript, AJAX, etc to generate links on the fly. If we download and parse web pages directly, we will not be able to retrieve dynamically generated links. To solve this problem, we perform server-side rendering (also called dynamic rendering) first before parsing a page
CHAPTER 10: DESIGN A NOTIFICATION SYSTEM
Ask questions first:
Ask questions first:
Is it a real-time system?
Requirements: It is a soft real-time system. We want a user to receive notifications as soon as possible. However, if the system is under a high workload, a slight delay is acceptable.
What triggers notifications?
Requirements: Notifications can be triggered by client applications. They can also be scheduled on the server-side.
Will users be able to opt-out?
Requirements: Yes, users who choose to opt-out will no longer receive notifications.
How many notifications are sent out each day?
Requirements: 10 million mobile push notifications, 1 million SMS messages, and 5 million emails.
High-level design
It is structured as follows:
Different types of notifications
Contact info gathering flow
Notification sending/receiving flow
Different types of notifications
iOS push notification
Provider A provider builds and sends notification requests to Apple Push Notification Service (APNS). To construct a push notification, the provider provides the following data:
- Device token: This is a unique identifier used for sending push notifications.
- Payload: This is a JSON dictionary that contains a notification’s payload.
{ "aps": { "alert": { "title": "Game Request", "body": "Bob wants to play chess", "action-loc-key": "PLAY" }, "badge": 5 } }
APNS: This is a remote service provided by Apple to propagate push notifications to iOS devices.
iOS Device: It is the end client, which receives push notifications.
Android push notification
Instead of using APNs, Firebase Cloud Messaging (FCM) is commonly used to send push notifications to android devices.
SMS message
For SMS messages, third party SMS services like Twilio [1], Nexmo [2], and many others are commonly used.
Email
Although companies can set up their own email servers, many of them opt for commercial email services. Sendgrid and Mailchimp are among the most popular email services, which offer a better delivery rate and data analytics.
Contact info gathering flow
To send notifications, we need to gather mobile device tokens, phone numbers, or email addresses. When a user installs our app or signs up for the first time, API servers collect user contact info and store it in the database.
Notification sending/receiving flow
Service 1 to N
They represent different services that send notifications via APIs provided by notification servers.
Notification servers
Provide APIs for services to send notifications. Those APIs are only accessible internally or by verified clients to prevent spams.
Carry out basic validations to verify emails, phone numbers, etc.
Query the database or cache to fetch data needed to render a notification.
Put notification data to message queues for parallel processing.
Deep Dive
Reliability
How to prevent data loss?
One of the most important requirements in a notification system is that it cannot lose data. Notifications can usually be delayed or re-ordered, but never lost. To satisfy this requirement, the notification system persists notification data in a database and implements a retry mechanism. The notification log database is included for data persistence
Will recipients receive a notification exactly once?
The short answer is no. Although notification is delivered exactly once most of the time, the distributed nature could result in duplicate notifications. To reduce the duplication occurrence, we introduce a dedupe mechanism and handle each failure case carefully. Here is a simple dedupe logic:
When a notification event first arrives, we check if it is seen before by checking the event ID. If it is seen before, it is discarded. Otherwise, we will send out the notification.
Reference: https://bravenewgeek.com/you-cannot-have-exactly-once-delivery/
Additional components and considerations
Notification setting
Many websites and apps give users fine-grained control over notification settings.
Rate limiting
Retry mechanism
When a third-party service fails to send a notification, the notification will be added to the message queue for retrying. If the problem persists, an alert will be sent out to developers.
Security in push notifications
For iOS or Android apps, appKey and appSecret are used to secure push notification APIs. Only authenticated or verified clients are allowed to send push notifications using our APIs.
Monitor queued notifications
A key metric to monitor is the total number of queued notifications. If the number is large, the notification events are not processed fast enough by workers. To avoid delay in the notification delivery, more workers are needed.
Events tracking
TODO: How to trigger an event when a user view a notification but didn't do other operations?
CHAPTER 11: DESIGN A NEWS FEED SYSTEM
Ask Questions First
Is this a mobile app? Or a web app? Or both?
Requirements: Both
What are the important features?
Requirements: A user can publish a post and see her friends’ posts on the news feed page.
Is the news feed sorted by reverse chronological order or any particular order such as topic scores?
Requirements: To keep things simple, let us assume the feed is sorted by reverse chronological order.
How many friends can a user have?
Requirements: 5000
What is the traffic volume?
Requirements: 10 million DAU
Can feed contain images, videos, or just text?
Requriements: It can contain media files, including both images and videos.
High Level Design
The design is divided into two flows: feed publishing and news feed building.
- Feed publishing: when a user publishes a post, corresponding data is written into cache and database. A post is populated to her friends’ news feed.
- Newsfeed building: for simplicity, let us assume the news feed is built by aggregating friends’ posts in reverse chronological order.
Newsfeed APIs
Feed publishing API To publish a post, a HTTP POST request will be sent to the server:
POST /v1/me/feed
- content: content is the text of the post.
- auth_token: used to authenticate API requests.
Newsfeed retrieval API The API to retrieve news feed:
GET /v1/me/feed
- auth_token: used to authenticate API requests.
Feed publishing
- Post service: persist post in the database and cache.
- Fanout service: push new content to friends’ news feed. Newsfeed data is stored in the cache for fast retrieval.
- Notification service: inform friends that new content is available and send out push notifications.
- Newsfeed building
- Newsfeed service: news feed service fetches news feed from the cache.
- Newsfeed cache: store news feed IDs needed to render the news feed.
Deep Dive
Feed publishing
Fanout service:
Fanout is the process of delivering a post to all friends. Two types of fanout models are: fanout on write (also called push model) and fanout on read (also called pull model).
Fanout on write. With this approach, news feed is pre-computed during write time. A new post is delivered to friends’ cache immediately after it is published.
Pros:
The news feed is generated in real-time and can be pushed to friends immediately.
Fetching news feed is fast because the news feed is pre-computed during write time.
Cons:
If a user has many friends, fetching the friend list and generating news feeds for all of them are slow and time consuming. It is called hotkey problem.
For inactive users or those rarely log in, pre-computing news feeds waste computing resources.
Fanout on read. The news feed is generated during read time. This is an on-demand model. Recent posts are pulled when a user loads her home page.
Pros:
For inactive users or those who rarely log in, fanout on read works better because it will not waste computing resources on them.
Data is not pushed to friends so there is no hotkey problem.
Cons:
- Fetching the news feed is slow as the news feed is not pre-computed.
Difference between a notification system and a content feeding system
1. Primary Purpose
Notification System:
- Goal: Alert users in real-time about events such as new posts, interactions (likes, comments, retweets), or updates.
- Example: Sending a push notification when a celebrity posts a new tweet, even if the user is not actively using the app.
Content Feeding System:
- Goal: Serve users with the actual content they want to see, such as posts from people they follow, organized in a timeline or feed.
- Example: Loading and displaying a user’s news feed when they open the app.
2. Mechanism
Notification System:
- Push-Based: Notifications are sent from the server directly to the user's device as soon as an event happens, regardless of whether the app is running or not.
- Real-Time: Designed to deliver immediate alerts and updates, triggering device notifications (e.g., banner, sound) to get the user’s attention.
Content Feeding System:
- Hybrid (Push/Pull) Model:
- Push Model (Fanout on Write): Pre-computes and stores content in the user’s feed at the time of the post, so it's ready when they open the app.
- Pull Model (Fanout on Read): Retrieves content on demand when the user opens the app, fetching recent posts dynamically.
3. Resource Usage
Notification System:
- Lightweight: Only metadata about the event (e.g., "User X posted a new tweet") is sent, which consumes minimal resources and bandwidth.
- Low Latency: Focuses on real-time delivery with minimal delay to ensure users receive alerts promptly.
Content Feeding System:
- Heavier: Involves delivering large amounts of content (e.g., posts, images, videos), requiring more storage, bandwidth, and processing power, especially if it pre-computes and caches the content for users.
- Latency Can Vary: Latency depends on whether content is pushed and precomputed (fast retrieval) or pulled on demand (slower due to real-time fetching).
4. User Interaction
Notification System:
- Passive Engagement: Users are alerted to new content or interactions without needing to open the app first. They can decide whether to open the app and engage with the content based on the notification.
Content Feeding System:
- Active Engagement: Users must open the app and interact with the feed to view and consume the content. The system is designed to deliver a seamless experience when the user engages directly with the app.
5. Independence
Notification System:
- Independent of Content Loading: The notification system operates separately from the content feeding system, meaning notifications are delivered even if the content itself is not yet loaded or fetched.
Content Feeding System:
- Dependent on Feed Model: The way content is delivered depends on whether the system uses a push or pull model, influencing how quickly and efficiently content is loaded when the user interacts with the feed.
6. Examples
Notification System:
- Push notifications for new posts, comments, or likes that pop up on your phone even if the app is closed.
Content Feeding System:
- The posts you see in your timeline when you open apps like Twitter, Instagram, or Facebook, either pre-computed and ready to view or fetched on demand.
Summary:
- Notification System: Push-based, real-time, lightweight alerts sent to users regardless of whether the app is open or closed.
- Content Feeding System: Responsible for delivering and rendering the actual content in a user’s feed, using either a push model (precomputed) or a pull model (on-demand).
We adopt a hybrid approach to get benefits of both approaches and avoid pitfalls in them. Since fetching the news feed fast is crucial, we use a push model for the majority of users. For celebrities or users who have many friends/followers, we let followers pull news content on-demand to avoid system overload. Consistent hashing is a useful technique to mitigate the hotkey problem as it helps to distribute requests/data more evenly.
1. Pushing Content to Others' News Feeds:
- In social media applications, each user typically has a personal news feed list, which contains posts from the accounts they follow. When you "push content" to a follower’s feed, it means that when a user creates a post, that post is immediately inserted into the news feed lists of all their followers.
- These personal news feed lists are usually stored in a database or cache (e.g., Redis) and are retrieved whenever the user requests their feed. For example, if Alice posts something, the system inserts Alice's post into the personal feeds of Bob, Carol, and all her other followers.
- Do feeds grow infinitely? Not usually. These lists are not stored infinitely; instead, social media platforms implement mechanisms to expire older content from the cache or database to avoid excessive storage growth. For example:
- TTL (Time to Live): Older posts might be automatically removed after a certain time period, such as after 30 days.
- Fixed-size lists: Only the most recent posts are kept in the user's feed (e.g., the last 100 posts), and older ones are deleted.
- Archival: Older content may be archived to cheaper, slower storage for long-term access but is not kept in the active cache.
- Cleanup policies: Periodic cleanup processes may be scheduled to remove inactive or outdated posts from news feeds.
2. Does Fanout on Write Require Users to Log In?
- No, Fanout on Write does not require users to log in to receive new posts in their feed. When a user creates a post, the system pushes that post to their followers’ news feed whether they are logged in or not.
- The primary benefit of this approach is that when users do log in, their news feeds are already pre-populated with the latest posts. This makes fetching the feed instantaneous, as the posts are already sitting in the cache or database.
Why not trigger a fetch request when users log in?
- Pre-computation Advantage: Fanout on Write pre-computes the feed during the post creation process (write time), so users experience minimal latency when loading their feed. If you were to trigger a fetch request upon login (which is closer to a Fanout on Read model), the system would need to retrieve posts from all the accounts the user follows, sort them, and then render the feed in real-time, which introduces higher latency.
- Instant Experience: Fanout on Write is designed for a real-time, instant experience, ensuring that even if a user has many followers or follows many people, their news feed is ready and waiting as soon as they log in.
Fanout Workers in Detail
Fanout workers play a crucial role in distributing posts to users' news feeds. They act as the processing layer between the message queue and the news feed cache. Here's a step-by-step breakdown of how they work:
1. Receiving the Task
- Fanout workers are subscribed to a message queue (e.g., RabbitMQ, Kafka). After a new post is published, the list of friends (or followers) and the post ID are sent to the message queue.
- The workers fetch the task from the queue, which typically includes the post ID and a list of user IDs (friends or followers).
2. Processing the Fanout
- The fanout worker processes each task by iterating through the list of user IDs.
- For each user ID, the worker creates an entry in the news feed cache linking the post to that user. This is usually stored as a
<post_id, user_id>
pair, which acts as a reference to the post rather than storing the full content (to save memory).
3. Efficient Storage
- Instead of storing the complete post object and user object, only IDs are stored in the cache. This drastically reduces memory usage. The actual post data (e.g., text, images) is retrieved later from a separate data store when a user views the post.
- The worker appends the new
<post_id, user_id>
pair to the news feed cache for each follower of the original poster.
4. Cache Management
- To manage memory usage, the news feed cache is typically bounded by a configurable limit. For example, each user's feed might store only the most recent 100 posts. Older posts may be removed or archived to prevent excessive memory consumption.
- The workers ensure that the cache maintains a fixed size by either removing the oldest posts when new ones are added or by setting a time-to-live (TTL) policy for how long posts are kept.
5. Parallel Processing
- In large-scale systems, there may be multiple fanout workers operating in parallel. This allows the system to handle large volumes of posts efficiently, even if the poster has millions of followers.
- Tasks are typically distributed across the workers, allowing them to independently process different batches of followers, which speeds up the fanout process.
6. Fault Tolerance
- Fanout workers are designed to be fault-tolerant. If a worker fails during the task, the message queue can reassign the task to another worker to ensure that no posts are lost.
- This ensures that all followers eventually receive the post, even if a worker fails.
Summary
- Fanout workers take the list of user IDs and post ID from the message queue.
- They efficiently store only the IDs in the cache to keep memory usage low.
- They maintain the news feed cache within a fixed size and process tasks in parallel to handle scale.
- Fault tolerance mechanisms ensure that tasks are completed even if individual workers fail.
Newsfeed retrieval
A user sends a request to retrieve her news feed. The request looks like this: /v1/me/feed
The load balancer redistributes requests to web servers.
Web servers call the news feed service to fetch news feeds.
News feed service gets a list post IDs from the news feed cache.
A user’s news feed is more than just a list of feed IDs. It contains username, profile picture, post content, post image, etc. Thus, the news feed service fetches the complete user and post objects from caches (user cache and post cache) to construct the fully hydrated news feed.
The fully hydrated news feed is returned in JSON format back to the client for rendering.
Expansion
1. Are Posts Strictly Chronological?
- No, the posts that a user receives on platforms like Twitter and Instagram are not always strictly chronological. These platforms often use algorithms to determine the order in which posts are presented. Factors such as engagement, relevance, and user interactions are prioritized over strict time order. While some real-time posts (like from close friends) might appear in a timely manner, other posts (like from less engaged accounts or celebrities) may be pulled and shown based on what the platform considers most relevant to the user.
2. When Should You Trigger Fanout on Read?
- Triggering Fanout on Read: In a hybrid model, fanout on read should be triggered when the user logs in and requests their feed but the cache contains only partial data from fanout on write.
- Return Fanout on Write Data First vs. Combining Data:
- Option 1: Return Fanout on Write Data First: You could return the fanout on write data (e.g., posts from close friends) immediately to reduce latency and allow users to see some content right away. Meanwhile, you would trigger fanout on read in the background and fetch additional posts (e.g., from celebrities) asynchronously. Once the additional posts are retrieved, you could either update the feed in real-time or load more content as the user scrolls.
- Option 2: Wait and Combine Data: Alternatively, you could wait until both fanout on write and fanout on read data are combined before sending the complete feed to the user. This ensures a more seamless experience but could introduce additional latency, as the user will have to wait until all data has been fetched and merged.
The choice depends on your user experience goals. If you prioritize instant loading, returning fanout on write data first is preferable. If you want a more complete feed from the start, you may opt to wait for fanout on read.
3. How Should the Server Retrieve Missing Data Not in the Fanout on Write Cache or DB?
- When the server detects that certain posts are missing from the fanout on write cache, it needs to run a fanout on read operation. Here’s how it can retrieve the missing data:
- Identify Missing Posts: The server first determines which posts are missing. This can be done by checking the last post timestamp or post sequence numbers to identify gaps in the feed. If certain posts are not found in the cache, they are flagged for retrieval.
- Query the Primary Data Store: For posts that were not cached during fanout on write, the server queries the primary database (where all posts are stored) to fetch recent posts from the accounts the user follows (e.g., celebrities or distant connections).
- Merge with Cache Data: Once the missing posts are retrieved from the database, they are merged with the cached data from fanout on write, ensuring that the final feed includes all relevant posts.
- Return Data to the User: Depending on the system design, this data can either be returned immediately once complete or loaded incrementally as the user scrolls through their feed.
4. How Can We Know What Data Should Be Retrieved During the Fanout on Read Phase?
To effectively retrieve the missing data during the fanout on read phase, the system can follow these steps:
Tracking Gaps in the Cache:
- Cache Metadata: The system can store metadata in the user’s feed cache, such as last update timestamps, post sequence numbers, or content source tags (e.g., marking whether a post was retrieved via fanout on write or fanout on read).
- This metadata can be used to track which posts were pushed during fanout on write and identify the content that was not included (such as posts from high-volume users or celebrities).
Querying the Relevant Data Sources:
- Identify Followed Accounts Excluded from Fanout on Write: During the fanout on read phase, the system checks the list of followed accounts that were excluded from fanout on write (e.g., celebrities, users with a large following). These accounts are flagged as requiring additional retrieval.
- Time-Based Queries: Using the last update timestamp or post sequence number, the system queries the database or a specialized data store to retrieve any new posts from these excluded accounts that were published after the user’s cache was last updated.
Dynamic Post Retrieval:
- The system performs a dynamic retrieval of posts from these accounts (via fanout on read) and integrates them into the user’s feed. This process ensures that the user receives both the real-time posts from close friends and the on-demand posts from celebrities or high-volume users.
Pagination and Infinite Scrolling:
- Since many social media platforms use pagination or infinite scrolling, the fanout on read phase can be executed incrementally as the user scrolls down their feed. As new data is needed, the system fetches the next batch of posts from the users excluded from fanout on write, ensuring a continuous and seamless user experience.
Prioritization and Filtering:
- During fanout on read, the system can prioritize fetching the most recent or most relevant posts from these excluded accounts, based on factors like engagement or user interest.
- Filtering mechanisms can be used to ensure that only the most relevant posts are pulled into the user’s feed, avoiding information overload.