View Javadoc
1   /*
2    * Copyright (C) 2017, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.storage.dfs;
12  
13  import static java.util.concurrent.TimeUnit.MILLISECONDS;
14  import static org.eclipse.jgit.lib.Constants.OBJ_BLOB;
15  import static org.junit.Assert.assertEquals;
16  import static org.junit.Assert.assertTrue;
17  
18  import java.util.Arrays;
19  import java.util.Collections;
20  import java.util.HashMap;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.stream.LongStream;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.Executors;
27  
28  import org.eclipse.jgit.internal.storage.pack.PackExt;
29  import org.eclipse.jgit.junit.TestRepository;
30  import org.eclipse.jgit.junit.TestRng;
31  import org.eclipse.jgit.lib.ObjectId;
32  import org.eclipse.jgit.lib.ObjectInserter;
33  import org.eclipse.jgit.lib.ObjectReader;
34  import org.eclipse.jgit.revwalk.RevCommit;
35  import org.junit.Before;
36  import org.junit.Rule;
37  import org.junit.Test;
38  import org.junit.rules.TestName;
39  
40  public class DfsBlockCacheTest {
41  	@Rule
42  	public TestName testName = new TestName();
43  	private TestRng rng;
44  	private DfsBlockCache cache;
45  	private ExecutorService pool;
46  
47  	@Before
48  	public void setUp() {
49  		rng = new TestRng(testName.getMethodName());
50  		pool = Executors.newFixedThreadPool(10);
51  		resetCache();
52  	}
53  
54  	@SuppressWarnings("resource")
55  	@Test
56  	public void streamKeyReusesBlocks() throws Exception {
57  		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
58  		InMemoryRepository r1 = new InMemoryRepository(repo);
59  		byte[] content = rng.nextBytes(424242);
60  		ObjectId id;
61  		try (ObjectInserter ins = r1.newObjectInserter()) {
62  			id = ins.insert(OBJ_BLOB, content);
63  			ins.flush();
64  		}
65  
66  		long oldSize = LongStream.of(cache.getCurrentSize()).sum();
67  		assertTrue(oldSize > 2000);
68  		assertEquals(0, LongStream.of(cache.getHitCount()).sum());
69  
70  		List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
71  		InMemoryRepository r2 = new InMemoryRepository(repo);
72  		r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
73  		try (ObjectReader rdr = r2.newObjectReader()) {
74  			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
75  			assertTrue(Arrays.equals(content, actual));
76  		}
77  		assertEquals(0, LongStream.of(cache.getMissCount()).sum());
78  		assertEquals(oldSize, LongStream.of(cache.getCurrentSize()).sum());
79  	}
80  
81  	@SuppressWarnings("resource")
82  	@Test
83  	public void weirdBlockSize() throws Exception {
84  		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
85  		InMemoryRepository r1 = new InMemoryRepository(repo);
86  
87  		byte[] content1 = rng.nextBytes(4);
88  		byte[] content2 = rng.nextBytes(424242);
89  		ObjectId id1;
90  		ObjectId id2;
91  		try (ObjectInserter ins = r1.newObjectInserter()) {
92  			id1 = ins.insert(OBJ_BLOB, content1);
93  			id2 = ins.insert(OBJ_BLOB, content2);
94  			ins.flush();
95  		}
96  
97  		resetCache();
98  		List<DfsPackDescription> packs = r1.getObjectDatabase().listPacks();
99  
100 		InMemoryRepository r2 = new InMemoryRepository(repo);
101 		r2.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
102 		r2.getObjectDatabase().commitPack(packs, Collections.emptyList());
103 		try (ObjectReader rdr = r2.newObjectReader()) {
104 			byte[] actual = rdr.open(id1, OBJ_BLOB).getBytes();
105 			assertTrue(Arrays.equals(content1, actual));
106 		}
107 
108 		InMemoryRepository r3 = new InMemoryRepository(repo);
109 		r3.getObjectDatabase().setReadableChannelBlockSizeForTest(500);
110 		r3.getObjectDatabase().commitPack(packs, Collections.emptyList());
111 		try (ObjectReader rdr = r3.newObjectReader()) {
112 			byte[] actual = rdr.open(id2, OBJ_BLOB).getBytes();
113 			assertTrue(Arrays.equals(content2, actual));
114 		}
115 	}
116 
117 	@SuppressWarnings("resource")
118 	@Test
119 	public void hasCacheHotMap() throws Exception {
120 		Map<PackExt, Integer> cacheHotMap = new HashMap<>();
121 		// Pack index will be kept in cache longer.
122 		cacheHotMap.put(PackExt.INDEX, Integer.valueOf(3));
123 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
124 				.setBlockLimit(512 * 4).setCacheHotMap(cacheHotMap));
125 		cache = DfsBlockCache.getInstance();
126 
127 		DfsRepositoryDescription repo = new DfsRepositoryDescription("test");
128 		InMemoryRepository r1 = new InMemoryRepository(repo);
129 		byte[] content = rng.nextBytes(424242);
130 		ObjectId id;
131 		try (ObjectInserter ins = r1.newObjectInserter()) {
132 			id = ins.insert(OBJ_BLOB, content);
133 			ins.flush();
134 		}
135 
136 		try (ObjectReader rdr = r1.newObjectReader()) {
137 			byte[] actual = rdr.open(id, OBJ_BLOB).getBytes();
138 			assertTrue(Arrays.equals(content, actual));
139 		}
140 		// All cache entries are hot and cache is at capacity.
141 		assertTrue(LongStream.of(cache.getHitCount()).sum() > 0);
142 		assertEquals(99, cache.getFillPercentage());
143 
144 		InMemoryRepository r2 = new InMemoryRepository(repo);
145 		content = rng.nextBytes(424242);
146 		try (ObjectInserter ins = r2.newObjectInserter()) {
147 			ins.insert(OBJ_BLOB, content);
148 			ins.flush();
149 		}
150 		assertEquals(0, LongStream.of(cache.getMissCount()).sum());
151 		assertTrue(cache.getEvictions()[PackExt.PACK.getPosition()] > 0);
152 		assertEquals(0, cache.getEvictions()[PackExt.INDEX.getPosition()]);
153 	}
154 
155 	@SuppressWarnings("resource")
156 	@Test
157 	public void noConcurrencySerializedReads_oneRepo() throws Exception {
158 		InMemoryRepository r1 = createRepoWithBitmap("test");
159 		// Reset cache with concurrency Level at 1 i.e. no concurrency.
160 		resetCache(1);
161 
162 		DfsReader reader = (DfsReader) r1.newObjectReader();
163 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
164 			// Only load non-garbage pack with bitmap.
165 			if (pack.isGarbage()) {
166 				continue;
167 			}
168 			asyncRun(() -> pack.getBitmapIndex(reader));
169 			asyncRun(() -> pack.getPackIndex(reader));
170 			asyncRun(() -> pack.getBitmapIndex(reader));
171 		}
172 		waitForExecutorPoolTermination();
173 
174 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
175 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
176 		// Reverse index has no pack extension, it defaults to 0.
177 		assertEquals(1, cache.getMissCount()[0]);
178 	}
179 
180 	@SuppressWarnings("resource")
181 	@Test
182 	public void noConcurrencySerializedReads_twoRepos() throws Exception {
183 		InMemoryRepository r1 = createRepoWithBitmap("test1");
184 		InMemoryRepository r2 = createRepoWithBitmap("test2");
185 		resetCache(1);
186 
187 		DfsReader reader = (DfsReader) r1.newObjectReader();
188 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
189 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
190 		// Safety check that both repos have the same number of packs.
191 		assertEquals(r1Packs.length, r2Packs.length);
192 
193 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
194 			DfsPackFile pack1 = r1Packs[i];
195 			DfsPackFile pack2 = r2Packs[i];
196 			if (pack1.isGarbage() || pack2.isGarbage()) {
197 				continue;
198 			}
199 			asyncRun(() -> pack1.getBitmapIndex(reader));
200 			asyncRun(() -> pack2.getBitmapIndex(reader));
201 		}
202 
203 		waitForExecutorPoolTermination();
204 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
205 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
206 		assertEquals(2, cache.getMissCount()[0]);
207 	}
208 
209 	@SuppressWarnings("resource")
210 	@Test
211 	public void lowConcurrencyParallelReads_twoRepos() throws Exception {
212 		InMemoryRepository r1 = createRepoWithBitmap("test1");
213 		InMemoryRepository r2 = createRepoWithBitmap("test2");
214 		resetCache(2);
215 
216 		DfsReader reader = (DfsReader) r1.newObjectReader();
217 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
218 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
219 		// Safety check that both repos have the same number of packs.
220 		assertEquals(r1Packs.length, r2Packs.length);
221 
222 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
223 			DfsPackFile pack1 = r1Packs[i];
224 			DfsPackFile pack2 = r2Packs[i];
225 			if (pack1.isGarbage() || pack2.isGarbage()) {
226 				continue;
227 			}
228 			asyncRun(() -> pack1.getBitmapIndex(reader));
229 			asyncRun(() -> pack2.getBitmapIndex(reader));
230 		}
231 
232 		waitForExecutorPoolTermination();
233 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
234 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
235 		assertEquals(2, cache.getMissCount()[0]);
236 	}
237 
238 	@SuppressWarnings("resource")
239 	@Test
240 	public void lowConcurrencyParallelReads_twoReposAndIndex()
241 			throws Exception {
242 		InMemoryRepository r1 = createRepoWithBitmap("test1");
243 		InMemoryRepository r2 = createRepoWithBitmap("test2");
244 		resetCache(2);
245 
246 		DfsReader reader = (DfsReader) r1.newObjectReader();
247 		DfsPackFile[] r1Packs = r1.getObjectDatabase().getPacks();
248 		DfsPackFile[] r2Packs = r2.getObjectDatabase().getPacks();
249 		// Safety check that both repos have the same number of packs.
250 		assertEquals(r1Packs.length, r2Packs.length);
251 
252 		for (int i = 0; i < r1.getObjectDatabase().getPacks().length; ++i) {
253 			DfsPackFile pack1 = r1Packs[i];
254 			DfsPackFile pack2 = r2Packs[i];
255 			if (pack1.isGarbage() || pack2.isGarbage()) {
256 				continue;
257 			}
258 			asyncRun(() -> pack1.getBitmapIndex(reader));
259 			asyncRun(() -> pack1.getPackIndex(reader));
260 			asyncRun(() -> pack2.getBitmapIndex(reader));
261 		}
262 		waitForExecutorPoolTermination();
263 
264 		assertEquals(2, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
265 		// Index is loaded once for each repo.
266 		assertEquals(2, cache.getMissCount()[PackExt.INDEX.ordinal()]);
267 		assertEquals(2, cache.getMissCount()[0]);
268 	}
269 
270 	@SuppressWarnings("resource")
271 	@Test
272 	public void highConcurrencyParallelReads_oneRepo() throws Exception {
273 		InMemoryRepository r1 = createRepoWithBitmap("test");
274 		resetCache();
275 
276 		DfsReader reader = (DfsReader) r1.newObjectReader();
277 		for (DfsPackFile pack : r1.getObjectDatabase().getPacks()) {
278 			// Only load non-garbage pack with bitmap.
279 			if (pack.isGarbage()) {
280 				continue;
281 			}
282 			asyncRun(() -> pack.getBitmapIndex(reader));
283 			asyncRun(() -> pack.getPackIndex(reader));
284 			asyncRun(() -> pack.getBitmapIndex(reader));
285 		}
286 		waitForExecutorPoolTermination();
287 
288 		assertEquals(1, cache.getMissCount()[PackExt.BITMAP_INDEX.ordinal()]);
289 		assertEquals(1, cache.getMissCount()[PackExt.INDEX.ordinal()]);
290 		assertEquals(1, cache.getMissCount()[0]);
291 	}
292 
293 	private void resetCache() {
294 		resetCache(32);
295 	}
296 
297 	private void resetCache(int concurrencyLevel) {
298 		DfsBlockCache.reconfigure(new DfsBlockCacheConfig().setBlockSize(512)
299 				.setConcurrencyLevel(concurrencyLevel).setBlockLimit(1 << 20));
300 		cache = DfsBlockCache.getInstance();
301 	}
302 
303 	private InMemoryRepository createRepoWithBitmap(String repoName)
304 			throws Exception {
305 		DfsRepositoryDescription repoDesc = new DfsRepositoryDescription(
306 				repoName);
307 		InMemoryRepository repo = new InMemoryRepository(repoDesc);
308 		try (TestRepository<InMemoryRepository> repository = new TestRepository<>(
309 				repo)) {
310 			RevCommit commit = repository.branch("/refs/ref1" + repoName)
311 					.commit().add("blob1", "blob1" + repoName).create();
312 			repository.branch("/refs/ref2" + repoName).commit()
313 					.add("blob2", "blob2" + repoName).parent(commit).create();
314 		}
315 		new DfsGarbageCollector(repo).pack(null);
316 		return repo;
317 	}
318 
319 	private void asyncRun(Callable<?> call) {
320 		pool.execute(() -> {
321 			try {
322 				call.call();
323 			} catch (Exception e) {
324 				// Ignore.
325 			}
326 		});
327 	}
328 
329 	private void waitForExecutorPoolTermination() throws Exception {
330 		pool.shutdown();
331 		pool.awaitTermination(500, MILLISECONDS);
332 		assertTrue("Threads did not complete, likely due to a deadlock.",
333 				pool.isTerminated());
334 	}
335 }