Efficient cache management plays a vital role in in-memory data-parallel systems, such as Spark, Tez, Storm and HANA. Recent research, notably research on the Least Reference Count (LRC) and Most Reference Distance (MRD) policies, has shown that dependency-aware caching management practices that consider the application’s directed acyclic graph (DAG) perform well in Spark. However, these practices ignore the further relationship between RDDs and cached some redundant RDDs with the same child RDDs, which degrades the memory performance. Hence, in memory-constrained situations, systems may encounter a performance bottleneck due to frequent data block replacement. In addition, the prefetch mechanisms in some cache management policies, such as MRD, are hard to trigger. In this paper, we propose a new cache management method called RDE (Redundant Data Eviction) that can fully utilize applications’ DAG information to optimize the management result. By considering both RDDs’ dependencies and the reference sequence, we effectively evict RDDs with redundant features and perfect the memory for incoming data blocks. Experiments show that RDE improves performance by an average of 55% compared to LRU and by up to 48% and 20% compared to LRC and MRD, respectively. RDE also shows less sensitivity to memory bottlenecks, which means better availability in memory-constrained environments.
With the increasing demand for data analytics, in-memory data-parallel systems, such as Spark [
Cache optimization has been widely studied, and many efficient algorithms have been proposed to satisfy various systems [
Recent studies show that a cache policy considering data dependencies in data-parallel systems has a better performance than traditional history-based methods. Several dependency-aware cache policies for Spark, such as LRC [
In this paper, we discuss how the DAG can be further exploited to optimize cache management. The solution should traverse the DAGs of applications and implement DAG-based cache management with an efficient redundant block eviction strategy. Moreover, the policy should have low overhead and be applicable to DAG-based in-memory data-parallel computing systems.
We propose a novel cache management policy, Redundant Data Eviction (RDE), that can release more available memory space with low overhead. RDE can find the deeper relationships between data blocks and evict redundant blocks as a function. Furthermore, with the memory space freed by evicting redundant data, we launch a prefetching mechanism in cache management for further performance improvement. RDE has the following advantages:
First, RDE can minimize the caching blocks by evicting target redundant data blocks. We analyze mass DAGs of typical applications to exploit the features of redundant data blocks. As a result, we can precisely target redundant data using RDDs’ dependencies and the schedule sequence in the workflow of applications. By evicting these redundant data, systems will have more memory space for computing and data caching, which will surely improve the performance.
Second, a cache management policy with redundant data eviction is more likely to attach a prefetching policy to achieve a better hit ratio in future workflows. As mentioned above, memory is always a constrained resource in data-parallel systems. RDE has less memory sensitivity and could have a better performance in resource-strict situations compared to previous cache management policies.
We implement RDE as a pluggable memory manager in Spark 2.4. To verify the efficiency of RDE, we conduct extensive evaluations on a six-node cluster with ten different data analysis workloads. For all the benchmarks, RDE shows high performance and large advantages in memory-constrained situations. According to our experimental results, RDE reduces the application runtime by 41% on average compared with the default LRU caching policy in Spark and generally improves the performance of the system by 35% and 20% compared to LRC and MRD, respectively.
The structure of the remainder of this paper is organized as follows. Section 2 presents the background and describes the inefficiency of existing cache strategies based on DAGs derived from system schedulers. The design of RDE and its implementation details are proposed in Section 3. The evaluation results are reported in Section 4. Finally, we conclude the paper in Section 5.
In this section, we discuss the background of data access in Spark jobs and provide the motivation for introducing a novel cache management policy. We limit our discussion to the context of Spark in this paper. However, the discussion is also applicable in other in-memory computing frameworks.
Spark is a distributed, in-memory computing framework for big data that provides the Resilient Distributed Dataset (RDD) as its primary abstraction in computing. RDDs are distributed datasets stored in memory. Spark can only transform an RDD into a new RDD using a transformation operation. The workflows of data on parallel computing frameworks are determined by DAGs consisting of RDDs. These DAGs contain rich information on data dependencies, which is crucial for data caching and has not been fully explored in default cache management policies.
For example, as a key abstraction in Spark, an RDD is a collection of objects partitioned across nodes in a Spark cluster [
In Spark, memory is divided into three parts: System memory (other and system reserved), execution memory and storage memory, as shown in
We apply SparkBench [
History-based cache management is widely used in various systems. LRU is a classic history-based cache management method and is used as the cache replacement algorithm in Spark. LRU keeps tracking the data in memory and evicts the blocks that have not been accessed for the longest periods of time. However, LRU is oblivious to the lineage of Spark jobs, resulting in poor efficiency of the eviction of RDDs.
To fully utilize the DAGs and achieve a more significant performance improvement, several DAG-based cache management policies have been proposed. LRC and MRD are both representative DAG-based cache policies and have been proven to have high performance on common benchmarks. LRC traverses the lineage and keeps tracking the dependency count of each RDD. This count is updated continuously as a priority for evicting RDD blocks from memory as the Spark jobs run. An RDD with a higher reference count is more likely to be used in future computations and should be cached in memory. To save the maximum amount of memory space, the RDD with the lowest dependency count should be evicted from memory. Compared with the default LRU policy, LRC improves the cache hit ratio and presents a better comprehensive application workflow. MRD analyzes the shortness of LRC and aims at improving the time sensitivity for caching. MRD always evicts data blocks whose reference distances are the largest and prefetches the data blocks with the lowest reference distances if possible. MRD performs better than LRC in systems with efficient memory.
However, when it is difficult to conduct prefetching and memory is constrained, frequent data block replacement will result in a significant performance degradation in MRD, while LRC can still obtain a better performance improvement on the system. Existing cache policies neglect the waste of memory resulting from redundant RDDs.
For example, in the lineage of the Connected Component (CC) shown in
In this section, we propose a new cache management method, RDE (Redundant Data Eviction), which can make cache decisions based on the DAGs of applications with an efficient redundant RDD eviction policy. We also describe our implementation in Spark.
Definition 1 (Candidate RDDs): RDDs in a stage with the same computing sequence are defined as candidate RDDs.
The computing sequence of
Definition 2 (Redundant RDD): An RDD is called a redundant RDD in a candidate RDD set if and only if the RDD is not the leaf RDD after implementing a depth-first search in the candidate RDD workflow, which is derived from the DAG of the application.
The two definitions compose the criterion for locating redundant RDDs in a DAG derived from the Spark scheduler. RDE is a DAG-based cache management policy with an efficient redundant RDD eviction strategy. Each RDD in a DAG has two parameters: The basic cache priority and the CS. We first traverse the DAG and compute each RDD’s CS according to the stage distance, which is utilized to represent the RDD computing sequence. In each stage, we perform a depth-first search among candidate RDDs with the same CS, only preserve the leaf RDD as the cache candidate, and view the remaining RDDs as redundant RDDs according to Definition 2. Then, we set the redundant RDDs with the lowest cache priority and recreate a new DAG to provide other RDDs with new cache priority. To coordinate with the prefetching algorithm, we use the stage distance to measure the basic cache priority of each RDD. For example, in Stage 6 of the PageRank lineage (
RDE can locate redundant RDDs quickly and avoid unnecessary overhead in memory. RDE provides systems with higher performance in memory-constrained situations. Moreover, the systems log of various workloads shows that the memory occupation remains at a high level due to Spark’s efficient memory management in all stages, which means that the prefetching process in existing cache management policies may lead to frequent block replacement and degrade the performance in Spark. RDE is less memory sensitive and can cache more essential RDDs. RDE will decrease the frequency of prefetching and reduce the overhead introduced by the prefetching mechanism in Spark. The eviction method can be described using the following algorithm.
As mentioned above, we choose the computing sequence to measure the cache priority for each RDD. Computation in Spark occurs when a new stage is established. The RDD computing sequence can be represented by the stage computing sequence. Each RDD has various stage distances that represent the different schedule orders in the entire workflow. An RDD with a noninfinite CS implies that this RDD will be used in future computations in current applications, and we consider this RDD to be a prefetchable RDD. However, prefetchable RDDs with lower cache priority need to be written to a disk to make room for higher priority RDDs. We keep a computing sequence table for prefetchable RDDs written to a disk. When the cache priority of prefetchable RDDs increases as a job runs and becomes higher than that of cached RDDs, the prefetching mechanism begins to work and cache prefetchable RDDs from the disk. RDE shows less memory sensitivity in computing, and the frequency of prefetching is lower than that of existing cache policies. RDDs with high priorities have less opportunity to be evicted by a prefetching mechanism, which will surely reduce the overhead caused by introducing a prefetch mechanism. The RDE cache management policy with prefetching can be described by the following algorithm.
We observe that to achieve basic cache priority, we need to traverse the entire application’s DAG. However, in systems such as Spark, applications usually consist of several jobs, and we can only obtain the DAG of the present job from the Spark scheduler. Therefore, it is a challenge to achieve the entire DAG of applications. To solve this problem, we reconsider our cache policy in two situations.
Mostly, applications that run on in-memory data-parallel systems are recurring and usually repeat certain jobs with the same DAG to process different data sets. Therefore, it is feasible to learn the entire DAG from previous jobs so that our cache policy performs better in these applications.
For nonrecurring applications with jobs that have different DAGs, in each single job, RDE works in the same way as in recurring applications, but the redundant RDDs and the cache priority should be recomputed when a new job is coming. The hit ratio will drop by a certain percentage compared with recurring applications.
API | Description |
---|---|
DAGProfile | REManager reconstructs the DAG and return cache priority for each RDD by recomputing the stage distance |
updateCachePriority | REManager sends a new cache priority file to CacheMonitor |
updateDSD | DAGAnalyzer returns a new dependency similarity index when receiving new DAGs |
BlocksEviction | When the cache is full, data with low cache priority will be evicted |
DataPrefetch | Prefetch specific blocks used in the next stage |
In this section, we evaluate the performance of our cache policy with typical benchmarks.
Our experimental platform was composed of several virtualized machines in two high-performance blade servers, which had 32 cores and 64 GB of memory each. The main tests were conducted in this virtual environment with nine nodes, which consisted of one master and eight slave nodes. The master node obtained a better configuration to satisfy the computing demand for cache policies. All the nodes were deployed with Spark 2.4.0 and Hadoop 2.8.0. The datasets were generated by SparkBench. The workloads and the amounts of input data are given in
Workloads | Amounts of data (GB) |
---|---|
KMeans | 10 |
PageRank | 11 |
Connected component | 8 |
PregelOperation | 7 |
SVD++ | 8.3 |
The master is configured with 8 cores and 8 GB of memory, while the slave nodes are configured with 4 cores and 4 GB of memory. We compared RDE with the Spark native cache policy LRU and two typical DAG-based cache policies, known as LRC and MRD. We show the results for two different scenarios: RDE with eviction-only and RDE with both eviction and prefetching. In both scenarios, RDE performs well, especially with prefetching; and RDE significantly decreases the benchmark runtime by increasing the hit ratio of the cache.
RDE with eviction-only. We conducted RDE eviction-only on several Spark benchmarks and compared its performance with the performances of the LRU, LRC and MRD polices. The results are shown in
It is clear that the application runtimes are reduced by up to 56% compared to the original cache policy LRU. Furthermore, RDE considers both the dependency and computing sequence of RDD, is more time sensitive than LRC in caching RDDs and achieves as high as a 30% improvement in performance over the Connected Component (CC) workload. RDE with the eviction-only policy also has a 9% to 15% performance improvement compared to MRD with the eviction-only policy since RDE caches less redundant RDDs to obtain a better hit ratio. In general, RDE provides a significant performance improvement due to its efficient eviction policy.
RDE with prefetching. RDE evicts redundant RDDs to free memory space for more valuable RDDs. With this mechanism, RDE can be more suitable for prefetching policies in memory-constrained situations. We conducted RDE with prefetching on the same benchmarks and datasets above, and the results of this method compared with those of LRU, LRC and MRD-Prefetch are shown in
RDE with prefetching combines the eviction policy with the prefetching method to obtain a performance. Compared to eviction-only policies, RDE improves the cache hit ratio with prefetching. It can be concluded that the performance improvement reaches 63% and 48% compared to LRU and LRC, respectively. We also compared our policy to MRD-Prefetch, which is also a DAG-based cache policy with prefetching. RDE achieves an approximately 9% to 20% advantage in performance, which benefits from the eviction policy of redundant RDDs. Especially in memory-constrained situations, RDE can take full use of memory space and make a prefetch policy easier to trigger.
As we mentioned above, RDE shows less memory sensitivity and performs well in memory-constrained situations. We deploy each executor memory size from 2 to 6 GB and compare RDE-Prefetch with LRU, LRC and MRD-Prefetch in several benchmarks with different configurations. Our purpose is to find the influence of the memory size on different cache policies.
As shown in
In this paper, we present a DAG-based cache management policy with redundant eviction data in Spark named RDE. RDE traverses the lineage of an application and computes the degree of dependency similarity for each RDD. Redundant RDDs have no opportunity to be cached in the workflow, which makes RDE perform better in memory-constrained situations. Moreover, we also adapt a prefetch mechanism to RDE to obtain a better cache hit ratio. Compared to the LRC and MRD policies, RDE achieves 35% and 20% improvements in performance, respectively, under memory-constrained circumstances.