metric

介绍

Metrics Registries类似一个metrics容器,维护一个Map,可以是一个服务一个实例。支持五种metric类型:Gauges、Counters、Meters、Histograms和Timers。
git:https://github.com/dropwizard/metrics

Gauges

Gauges是一个最简单的计量,一般用来统计瞬时状态的数据信息,比如系统中处于pending状态的job

Meters

Meters用来度量某个时间段的平均处理次数(request per second),每1、5、15分钟的TPS。比如一个service的请求数,通过metrics.meter()实例化一个Meter之后,然后通过 meter.mark()方法就能将本次请求记录下来。统计结果有总的请求数,平均每秒的请求数,以及最近的1、5、15分钟的平均TPS。

Histograms

Histograms主要使用来统计数据的分布情况,最大值、最小值、平均值、中位数,百分比(75%、90%、95%、98%、99%和99.9%)。例如,需要统计某个页面的请求响应时间分布情况,可以使用该种类型的Metrics进行统计。

Timers

Timers主要是用来统计某一块代码段的执行时间以及其分布情况,具体是基于Histograms和Meters来实现的。

源码包分析

metrics-annotation

Counted,Gauge,Metered,Metric,Timed注解的方式设置name,tags

metrics-benchmarks

Counter标准

metrics-collectd

Collectd - 收集服务客服端
打开InetSocketAddress,通过DatagramChannel发送字节数据。

metrics-core

核心类包

CachedGauge.java

gauge缓存抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package io.dropwizard.metrics;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link Gauge} implementation which caches its value for a period of time.
* gauge 的 缓存
* @param <T> the type of the gauge's value
*/
public abstract class CachedGauge<T> implements Gauge<T> {
private final Clock clock;
private final AtomicLong reloadAt;
private final long timeoutNS;

private volatile T value;

/**
* Creates a new cached gauge with the given timeout period.
*
* @param timeout the timeout
* @param timeoutUnit the unit of {@code timeout}
*/
protected CachedGauge(long timeout, TimeUnit timeoutUnit) {
this(Clock.defaultClock(), timeout, timeoutUnit);
}

/**
* Creates a new cached gauge with the given clock and timeout period.
*
* @param clock the clock used to calculate the timeout
* @param timeout the timeout
* @param timeoutUnit the unit of {@code timeout}
*/
protected CachedGauge(Clock clock, long timeout, TimeUnit timeoutUnit) {
this.clock = clock;
this.reloadAt = new AtomicLong(0);
this.timeoutNS = timeoutUnit.toNanos(timeout);
}

/**
* Loads the value and returns it.
*
* 读取缓存数据
*
* @return the new value
*/
protected abstract T loadValue();

@Override
public T getValue() {
if (shouldLoad()) {
//从缓存获取数据
this.value = loadValue();
}
return value;
}

/**
*
*
*/
private boolean shouldLoad() {
for (; ; ) {
//系统时间
final long time = clock.getTick();
//缓存时间
final long current = reloadAt.get();
//缓存时间 > 系统时间 ,获取缓存
if (current > time) {
return false;
}
//更新缓存数据,重新设置缓存超时时间
if (reloadAt.compareAndSet(current, time + timeoutNS)) {
return true;
}
}
}

@Override
public String toString() {
return String.valueOf(this.getValue());
}
}

Clock.java

抽象的时间类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package io.dropwizard.metrics;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;

/**
* An abstraction for how time passes. It is passed to {@link Timer} to track timing.
*
* 抽象的时间类
*
*/
public abstract class Clock {
/**
* Returns the current time tick.
*
* @return time tick in nanoseconds
*/
public abstract long getTick();

/**
* Returns the current time in milliseconds.
*
* @return time in milliseconds
*/
public long getTime() {
return System.currentTimeMillis();
}

@Override
public String toString() {
return Long.toString(this.getTime());
}

private static final Clock DEFAULT = new UserTimeClock();

/**
* The default clock to use.
*
* @return the default {@link Clock} instance
*
* @see Clock.UserTimeClock
*/
public static Clock defaultClock() {
return DEFAULT;
}

/**
* A clock implementation which returns the current time in epoch nanoseconds.
* 系统时间
*
*/
public static class UserTimeClock extends Clock {
@Override
public long getTick() {
return System.nanoTime();
}
}

/**
* A clock implementation which returns the current thread's CPU time.
* 返回当前CPU时间
*
*/
public static class CpuTimeClock extends Clock {
private static final ThreadMXBean THREAD_MX_BEAN = ManagementFactory.getThreadMXBean();

@Override
public long getTick() {
return THREAD_MX_BEAN.getCurrentThreadCpuTime();
}
}
}

Counter.java

计数抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package io.dropwizard.metrics;

/**
* An incrementing and decrementing counter metric.
*/
public class Counter implements Metric, Counting {
/**
* 原子自增
*/
private final LongAdder count;

public Counter() {
this.count = LongAdderFactory.create();
}

/**
* Increment the counter by one.
*/
public void inc() {
inc(1);
}

/**
* Increment the counter by {@code n}.
*
* @param n the amount by which the counter will be increased
*/
public void inc(long n) {
count.add(n);
}

/**
* Decrement the counter by one.
*/
public void dec() {
dec(1);
}

/**
* Decrement the counter by {@code n}.
*
* @param n the amount by which the counter will be decreased
*/
public void dec(long n) {
count.add(-n);
}

/**
* Returns the counter's current value.
*
* @return the counter's current value
*/
@Override
public long getCount() {
return count.sum();
}

@Override
public String toString() {
return String.valueOf(this.count);
}
}

Reservoir.java

统计存储层的数据流,数据流处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package io.dropwizard.metrics;

/**
* A statistically representative reservoir of a data stream.
* 统计代表储层数据流
*
*/
public interface Reservoir {
/**
* Returns the number of values recorded.
*
* @return the number of values recorded
*/
int size();

/**
* Adds a new recorded value to the reservoir.
*
* @param value a new recorded value
*/
void update(long value);

/**
* Returns a snapshot of the reservoir's values.
*
* @return a snapshot of the reservoir's values
*/
Snapshot getSnapshot();
}

Histogram.java

直方图
实现:快照,Counting接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package io.dropwizard.metrics;

import java.io.ByteArrayOutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
* A metric which calculates the distribution of a value.
*
* 计算分布式数据的值
*
* @see <a href="http://www.johndcook.com/standard_deviation.html">Accurately computing running
* variance</a>
*/
public class Histogram implements Metric, Sampling, Counting {

/**
* 数据流
*/
private final Reservoir reservoir;
private final LongAdder count;

/**
* Creates a new {@link Histogram} with the given reservoir.
*
* @param reservoir the reservoir to create a histogram from
*/
public Histogram(Reservoir reservoir) {
this.reservoir = reservoir;
this.count = LongAdderFactory.create();
}

/**
* Adds a recorded value.
*
* @param value the length of the value
*/
public void update(int value) {
update((long) value);
}

/**
* Adds a recorded value.
*
* @param value the length of the value
*/
public void update(long value) {
count.increment();
reservoir.update(value);
}

/**
* Returns the number of values recorded.
*
* @return the number of values recorded
*/
@Override
public long getCount() {
return count.sum();
}

/**
* 从数据流中获取快照
* @return
*/
@Override
public Snapshot getSnapshot() {
return reservoir.getSnapshot();
}

@Override
public String toString() {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
this.getSnapshot().dump(out);
try {
return out.toString(StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
return super.toString();
}
}
}

MetricName.java

设置metric的名字,tag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
package io.dropwizard.metrics;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;

/**
* A metric name with the ability to include semantic tags.
* 度量的名字,可能包含的标签
*
*
* This replaces the previous style where metric names where strictly
* dot-separated strings.
*
* @author udoprog
*/
public class MetricName implements Comparable<MetricName> {
public static final String SEPARATOR = ".";
public static final Map<String, String> EMPTY_TAGS = Collections.unmodifiableMap(new HashMap<String, String>());
public static final MetricName EMPTY = new MetricName();

private final String key;
private final Map<String, String> tags;

public MetricName() {
this.key = null;
this.tags = EMPTY_TAGS;
}

public MetricName(String key) {
this.key = key;
this.tags = EMPTY_TAGS;
}

public MetricName(String key, Map<String, String> tags) {
this.key = key;
this.tags = checkTags(tags);
}

private Map<String, String> checkTags(Map<String, String> tags) {
if (tags == null || tags.isEmpty()) {
return EMPTY_TAGS;
}

return Collections.unmodifiableMap(tags);
}

public String getKey() {
return key;
}

public Map<String, String> getTags() {
return tags;
}

/**
* 重新构建metric的name
* Build the MetricName that is this with another path appended to it.
*
* The new MetricName inherits the tags of this one.
*
* @param p The extra path element to add to the new metric.
* @return A new metric name relative to the original by the path specified
* in p.
*/
public MetricName resolve(String p) {
final String next;

if (p != null && !p.isEmpty()) {
if (key != null && !key.isEmpty()) {
next = key + SEPARATOR + p;
} else {
next = p;
}
} else {
next = this.key;
}

return new MetricName(next, tags);
}

/**
* Add tags to a metric name and return the newly created MetricName.
*
* @param add Tags to add.
* @return A newly created metric name with the specified tags associated with it.
*/
public MetricName tagged(Map<String, String> add) {
final Map<String, String> tags = new HashMap<String, String>(add);
tags.putAll(this.tags);
return new MetricName(key, tags);
}

/**
* Same as {@link #tagged(Map)}, but takes a variadic list
* of arguments.
*
* @see #tagged(Map)
* @param pairs An even list of strings acting as key-value pairs.
* @return A newly created metric name with the specified tags associated
* with it.
*/
public MetricName tagged(String... pairs) {
if (pairs == null) {
return this;
}

if (pairs.length % 2 != 0) {
throw new IllegalArgumentException("Argument count must be even");
}

final Map<String, String> add = new HashMap<String, String>();

for (int i = 0; i < pairs.length; i += 2) {
add.put(pairs[i], pairs[i+1]);
}

return tagged(add);
}

/**
* Join the specified set of metric names.
*
* @param parts Multiple metric names to join using the separator.
* @return A newly created metric name which has the name of the specified
* parts and includes all tags of all child metric names.
**/
public static MetricName join(MetricName... parts) {
final StringBuilder nameBuilder = new StringBuilder();
final Map<String, String> tags = new HashMap<String, String>();

boolean first = true;

for (MetricName part : parts) {
final String name = part.getKey();

if (name != null && !name.isEmpty()) {
if (first) {
first = false;
} else {
nameBuilder.append(SEPARATOR);
}

nameBuilder.append(name);
}

if (!part.getTags().isEmpty())
tags.putAll(part.getTags());
}

return new MetricName(nameBuilder.toString(), tags);
}

/**
* Build a new metric name using the specific path components.
*
* @param parts Path of the new metric name.
* @return A newly created metric name with the specified path.
**/
public static MetricName build(String... parts) {
if (parts == null || parts.length == 0)
return MetricName.EMPTY;

if (parts.length == 1)
return new MetricName(parts[0], EMPTY_TAGS);

return new MetricName(buildName(parts), EMPTY_TAGS);
}

private static String buildName(String... names) {
final StringBuilder builder = new StringBuilder();
boolean first = true;

for (String name : names) {
if (name == null || name.isEmpty())
continue;

if (first) {
first = false;
} else {
builder.append(SEPARATOR);
}

builder.append(name);
}

return builder.toString();
}

@Override
public String toString() {
if (tags.isEmpty()) {
return key;
//return key + "{}";
}

return key + tags;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((tags == null) ? 0 : tags.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;

if (obj == null)
return false;

if (getClass() != obj.getClass())
return false;

MetricName other = (MetricName) obj;

if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;

if (!tags.equals(other.tags))
return false;

return true;
}

@Override
public int compareTo(MetricName o) {
if (o == null)
return -1;

int c = compareName(key, o.getKey());

if (c != 0)
return c;

return compareTags(tags, o.getTags());
}

private int compareName(String left, String right) {
if (left == null && right == null)
return 0;

if (left == null)
return 1;

if (right == null)
return -1;

return left.compareTo(right);
}

private int compareTags(Map<String, String> left, Map<String, String> right) {
if (left == null && right == null)
return 0;

if (left == null)
return 1;

if (right == null)
return -1;

final Iterable<String> keys = uniqueSortedKeys(left, right);

for (final String key : keys) {
final String a = left.get(key);
final String b = right.get(key);

if (a == null && b == null)
continue;

if (a == null)
return -1;

if (b == null)
return 1;

int c = a.compareTo(b);

if (c != 0)
return c;
}

return 0;
}

private Iterable<String> uniqueSortedKeys(Map<String, String> left, Map<String, String> right) {
final Set<String> set = new TreeSet<String>(left.keySet());
set.addAll(right.keySet());
return set;
}
}

MetricRegistry.java

注册MetricRegistry,构建简单metric类型对象,创建和获取counter,histogram,meter,timer监测的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
package io.dropwizard.metrics;

import io.dropwizard.metrics.Counter;
import io.dropwizard.metrics.ExponentiallyDecayingReservoir;
import io.dropwizard.metrics.Gauge;
import io.dropwizard.metrics.Histogram;
import io.dropwizard.metrics.Meter;
import io.dropwizard.metrics.Metric;
import io.dropwizard.metrics.MetricFilter;
import io.dropwizard.metrics.MetricRegistry;
import io.dropwizard.metrics.MetricRegistryListener;
import io.dropwizard.metrics.MetricSet;
import io.dropwizard.metrics.Timer;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* A registry of metric instances.
*
* 监测实例:新增监测对象,监测分类
*
*/
public class MetricRegistry implements MetricSet {

/**
* @see #name(String, String...)
* 名字
*/
public static MetricName name(Class<?> klass, String... names) {
return name(klass.getName(), names);
}

/**
* Shorthand method for backwards compatibility in creating metric names.
*
* Uses {@link MetricName#build(String...)} for its
* heavy lifting.
*
* @see MetricName#build(String...)
* @param name The first element of the name
* @param names The remaining elements of the name
* @return A metric name matching the specified components.
*/
public static MetricName name(String name, String... names) {
final int length;

if (names == null) {
length = 0;
} else {
length = names.length;
}

final String[] parts = new String[length + 1];
parts[0] = name;

for (int i = 0; i < length; i++) {
parts[i+1] = names[i];
}

return MetricName.build(parts);
}

//监测tag名字,监测实现类
private final ConcurrentMap<MetricName, Metric> metrics;
//监听对象列表
private final List<MetricRegistryListener> listeners;

/**
* Creates a new {@link MetricRegistry}.
*/
public MetricRegistry() {
this(new ConcurrentHashMap<MetricName, Metric>());
}

/**
* Creates a {@link MetricRegistry} with a custom {@link ConcurrentMap} implementation for use
* inside the registry. Call as the super-constructor to create a {@link MetricRegistry} with
* space- or time-bounded metric lifecycles, for example.
*/
protected MetricRegistry(ConcurrentMap<MetricName, Metric> metricsMap) {
this.metrics = metricsMap;
this.listeners = new CopyOnWriteArrayList<MetricRegistryListener>();
}

/**
* @see #register(MetricName, Metric)
*/
@SuppressWarnings("unchecked")
public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
return register(MetricName.build(name), metric);
}

/**
* Given a {@link Metric}, registers it under the given name.
* 注册
*
* @param name the name of the metric
* @param metric the metric
* @param <T> the type of the metric
* @return {@code metric}
* @throws IllegalArgumentException if the name is already registered
*/
@SuppressWarnings("unchecked")
public <T extends Metric> T register(MetricName name, T metric) throws IllegalArgumentException {
//注册metric列表
if (metric instanceof MetricSet) {
registerAll(name, (MetricSet) metric);
} else {
//存在metric
final Metric existing = metrics.putIfAbsent(name, metric);
if (existing == null) {
onMetricAdded(name, metric);
} else {
throw new IllegalArgumentException("A metric named " + name + " already exists");
}
}

return metric;
}

/**
* Given a metric set, registers them.
*
* @param metrics a set of metrics
* @throws IllegalArgumentException if any of the names are already registered
*/
public void registerAll(MetricSet metrics) throws IllegalArgumentException {
registerAll(null, metrics);
}

/**
* @see #counter(MetricName)
*/
public Counter counter(String name) {
return counter(MetricName.build(name));
}

/**
* Return the {@link Counter} registered under this name; or create and register
* a new {@link Counter} if none is registered.
*
* @param name the name of the metric
* @return a new or pre-existing {@link Counter}
*/
public Counter counter(MetricName name) {
return getOrAdd(name, MetricBuilder.COUNTERS);
}

/**
* @see #histogram(MetricName)
*/
public Histogram histogram(String name) {
return histogram(MetricName.build(name));
}

/**
* Return the {@link Histogram} registered under this name; or create and register
* a new {@link Histogram} if none is registered.
*
* @param name the name of the metric
* @return a new or pre-existing {@link Histogram}
*/
public Histogram histogram(MetricName name) {
return getOrAdd(name, MetricBuilder.HISTOGRAMS);
}

/**
* @see #meter(MetricName)
*/
public Meter meter(String name) {
return meter(MetricName.build(name));
}

/**
* Return the {@link Meter} registered under this name; or create and register
* a new {@link Meter} if none is registered.
*
* @param name the name of the metric
* @return a new or pre-existing {@link Meter}
*/
public Meter meter(MetricName name) {
return getOrAdd(name, MetricBuilder.METERS);
}

/**
* @see #timer(MetricName)
*/
public Timer timer(String name) {
return timer(MetricName.build(name));
}

/**
* Return the {@link Timer} registered under this name; or create and register
* a new {@link Timer} if none is registered.
*
* @param name the name of the metric
* @return a new or pre-existing {@link Timer}
*/
public Timer timer(MetricName name) {
return getOrAdd(name, MetricBuilder.TIMERS);
}

/**
* Removes the metric with the given name.
*
* @param name the name of the metric
* @return whether or not the metric was removed
*/
public boolean remove(MetricName name) {
final Metric metric = metrics.remove(name);
if (metric != null) {
onMetricRemoved(name, metric);
return true;
}
return false;
}

/**
* Removes all metrics which match the given filter.
*
* @param filter a filter
* @return whether or not a metric was removed
*/
public boolean removeMatching(MetricFilter filter) {
boolean removed = false;
for (Map.Entry<MetricName, Metric> entry : metrics.entrySet()) {
if (filter.matches(entry.getKey(), entry.getValue())) {
removed |= remove(entry.getKey());
}
}
return removed;
}

/**
* Adds a {@link MetricRegistryListener} to a collection of listeners that will be notified on
* metric creation. Listeners will be notified in the order in which they are added.
* <p/>
* <b>N.B.:</b> The listener will be notified of all existing metrics when it first registers.
*
* @param listener the listener that will be notified
*/
public void addListener(MetricRegistryListener listener) {
listeners.add(listener);

for (Map.Entry<MetricName, Metric> entry : metrics.entrySet()) {
notifyListenerOfAddedMetric(listener, entry.getValue(), entry.getKey());
}
}

/**
* Removes a {@link MetricRegistryListener} from this registry's collection of listeners.
*
* @param listener the listener that will be removed
*/
public void removeListener(MetricRegistryListener listener) {
listeners.remove(listener);
}

/**
* Returns a set of the names of all the metrics in the registry.
*
* @return the names of all the metrics
*/
public SortedSet<MetricName> getNames() {
return Collections.unmodifiableSortedSet(new TreeSet<MetricName>(metrics.keySet()));
}

/**
* Returns a map of all the gauges in the registry and their names.
*
* @return all the gauges in the registry
*/
public SortedMap<MetricName, Gauge> getGauges() {
return getGauges(MetricFilter.ALL);
}

/**
* Returns a map of all the gauges in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @return all the gauges in the registry
*/
public SortedMap<MetricName, Gauge> getGauges(MetricFilter filter) {
return getMetrics(Gauge.class, filter);
}

/**
* Returns a map of all the counters in the registry and their names.
*
* @return all the counters in the registry
*/
public SortedMap<MetricName, Counter> getCounters() {
return getCounters(MetricFilter.ALL);
}

/**
* Returns a map of all the counters in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @return all the counters in the registry
*/
public SortedMap<MetricName, Counter> getCounters(MetricFilter filter) {
return getMetrics(Counter.class, filter);
}

/**
* Returns a map of all the histograms in the registry and their names.
*
* @return all the histograms in the registry
*/
public SortedMap<MetricName, Histogram> getHistograms() {
return getHistograms(MetricFilter.ALL);
}

/**
* Returns a map of all the histograms in the registry and their names which match the given
* filter.
*
* @param filter the metric filter to match
* @return all the histograms in the registry
*/
public SortedMap<MetricName, Histogram> getHistograms(MetricFilter filter) {
return getMetrics(Histogram.class, filter);
}

/**
* Returns a map of all the meters in the registry and their names.
*
* @return all the meters in the registry
*/
public SortedMap<MetricName, Meter> getMeters() {
return getMeters(MetricFilter.ALL);
}

/**
* Returns a map of all the meters in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @return all the meters in the registry
*/
public SortedMap<MetricName, Meter> getMeters(MetricFilter filter) {
return getMetrics(Meter.class, filter);
}

/**
* Returns a map of all the timers in the registry and their names.
*
* @return all the timers in the registry
*/
public SortedMap<MetricName, Timer> getTimers() {
return getTimers(MetricFilter.ALL);
}

/**
* Returns a map of all the timers in the registry and their names which match the given filter.
*
* @param filter the metric filter to match
* @return all the timers in the registry
*/
public SortedMap<MetricName, Timer> getTimers(MetricFilter filter) {
return getMetrics(Timer.class, filter);
}

/**
* 获取metric,如果没有metric,则新增
* @param name
* @param builder
* @param <T>
* @return
*/
@SuppressWarnings("unchecked")
private <T extends Metric> T getOrAdd(MetricName name, MetricBuilder<T> builder) {
final Metric metric = metrics.get(name);
if (builder.isInstance(metric)) {
return (T) metric;
} else if (metric == null) {
try {
return register(name, builder.newMetric());
} catch (IllegalArgumentException e) {
final Metric added = metrics.get(name);
if (builder.isInstance(added)) {
return (T) added;
}
}
}
throw new IllegalArgumentException(name + " is already used for a different type of metric");
}

@SuppressWarnings("unchecked")
private <T extends Metric> SortedMap<MetricName, T> getMetrics(Class<T> klass, MetricFilter filter) {
final TreeMap<MetricName, T> timers = new TreeMap<MetricName, T>();
for (Map.Entry<MetricName, Metric> entry : metrics.entrySet()) {
if (klass.isInstance(entry.getValue()) && filter.matches(entry.getKey(),
entry.getValue())) {
timers.put(entry.getKey(), (T) entry.getValue());
}
}
return Collections.unmodifiableSortedMap(timers);
}

/**
* 新增metric
* @param name
* @param metric
*/
private void onMetricAdded(MetricName name, Metric metric) {
for (MetricRegistryListener listener : listeners) {
notifyListenerOfAddedMetric(listener, metric, name);
}
}

/**
* 根据类型新增metric
* @param listener
* @param metric
* @param name
*/
private void notifyListenerOfAddedMetric(MetricRegistryListener listener, Metric metric, MetricName name) {
if (metric instanceof Gauge) {
listener.onGaugeAdded(name, (Gauge<?>) metric);
} else if (metric instanceof Counter) {
listener.onCounterAdded(name, (Counter) metric);
} else if (metric instanceof Histogram) {
listener.onHistogramAdded(name, (Histogram) metric);
} else if (metric instanceof Meter) {
listener.onMeterAdded(name, (Meter) metric);
} else if (metric instanceof Timer) {
listener.onTimerAdded(name, (Timer) metric);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
}

/**
* 删除
* @param name
* @param metric
*/
private void onMetricRemoved(MetricName name, Metric metric) {
for (MetricRegistryListener listener : listeners) {
notifyListenerOfRemovedMetric(name, metric, listener);
}
}

private void notifyListenerOfRemovedMetric(MetricName name, Metric metric, MetricRegistryListener listener) {
if (metric instanceof Gauge) {
listener.onGaugeRemoved(name);
} else if (metric instanceof Counter) {
listener.onCounterRemoved(name);
} else if (metric instanceof Histogram) {
listener.onHistogramRemoved(name);
} else if (metric instanceof Meter) {
listener.onMeterRemoved(name);
} else if (metric instanceof Timer) {
listener.onTimerRemoved(name);
} else {
throw new IllegalArgumentException("Unknown metric type: " + metric.getClass());
}
}

/**
* 注册metric列表
* @param prefix
* @param metrics
* @throws IllegalArgumentException
*/
private void registerAll(MetricName prefix, MetricSet metrics) throws IllegalArgumentException {
if (prefix == null)
prefix = MetricName.EMPTY;

for (Map.Entry<MetricName, Metric> entry : metrics.getMetrics().entrySet()) {
if (entry.getValue() instanceof MetricSet) {
registerAll(MetricName.join(prefix, entry.getKey()), (MetricSet) entry.getValue());
} else {
register(MetricName.join(prefix, entry.getKey()), entry.getValue());
}
}
}

/**
* 返回metricsMap,一个不能修改的map
* @return
*/
@Override
public Map<MetricName, Metric> getMetrics() {
return Collections.unmodifiableMap(metrics);
}

/**
* A quick and easy way of capturing the notion of default metrics.
* 简单快速自定义接口
* 实现不同metric类型
*
*/
private interface MetricBuilder<T extends Metric> {
//counters
MetricBuilder<Counter> COUNTERS = new MetricBuilder<Counter>() {
@Override
public Counter newMetric() {
return new Counter();
}

@Override
public boolean isInstance(Metric metric) {
return Counter.class.isInstance(metric);
}
};
//histograms
MetricBuilder<Histogram> HISTOGRAMS = new MetricBuilder<Histogram>() {
@Override
public Histogram newMetric() {
return new Histogram(new ExponentiallyDecayingReservoir());
}

@Override
public boolean isInstance(Metric metric) {
return Histogram.class.isInstance(metric);
}
};
//meters
MetricBuilder<Meter> METERS = new MetricBuilder<Meter>() {
@Override
public Meter newMetric() {
return new Meter();
}

@Override
public boolean isInstance(Metric metric) {
return Meter.class.isInstance(metric);
}
};
//timers
MetricBuilder<Timer> TIMERS = new MetricBuilder<Timer>() {
@Override
public Timer newMetric() {
return new Timer();
}

@Override
public boolean isInstance(Metric metric) {
return Timer.class.isInstance(metric);
}
};

T newMetric();

boolean isInstance(Metric metric);
}
}

Snapshot.java

输出数据到流中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package io.dropwizard.metrics;

import java.io.OutputStream;

/**
* A statistical snapshot of a {@link Snapshot}.
*
* 统计快照
*
*/
public abstract class Snapshot {

/**
* Returns the value at the given quantile.
*
* @param quantile a given quantile, in {@code [0..1]}
* @return the value in the distribution at {@code quantile}
*/
public abstract double getValue(double quantile);

/**
* Returns the entire set of values in the snapshot.
*
* @return the entire set of values
*/
public abstract long[] getValues();

/**
* Returns the number of values in the snapshot.
*
* @return the number of values
*/
public abstract int size();

/**
* Returns the median value in the distribution.
*
* @return the median value
*/
public double getMedian() {
return getValue(0.5);
}

/**
* Returns the value at the 75th percentile in the distribution.
*
* @return the value at the 75th percentile
*/
public double get75thPercentile() {
return getValue(0.75);
}

/**
* Returns the value at the 95th percentile in the distribution.
*
* @return the value at the 95th percentile
*/
public double get95thPercentile() {
return getValue(0.95);
}

/**
* Returns the value at the 98th percentile in the distribution.
*
* @return the value at the 98th percentile
*/
public double get98thPercentile() {
return getValue(0.98);
}

/**
* Returns the value at the 99th percentile in the distribution.
*
* @return the value at the 99th percentile
*/
public double get99thPercentile() {
return getValue(0.99);
}

/**
* Returns the value at the 99.9th percentile in the distribution.
*
* @return the value at the 99.9th percentile
*/
public double get999thPercentile() {
return getValue(0.999);
}

/**
* Returns the highest value in the snapshot.
*
* @return the highest value
*/
public abstract long getMax();

/**
* Returns the arithmetic mean of the values in the snapshot.
*
* @return the arithmetic mean
*/
public abstract double getMean();

/**
* Returns the lowest value in the snapshot.
*
* @return the lowest value
*/
public abstract long getMin();

/**
* Returns the standard deviation of the values in the snapshot.
*
* @return the standard value
*/
public abstract double getStdDev();

/**
* Writes the values of the snapshot to the given stream.
*
* @param output an output stream
*/
public abstract void dump(OutputStream output);

}

InstrumentedExecutors.java

调用:InstrumentedExecutorService,InstrumentedScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
package io.dropwizard.metrics;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Factory and utility methods for {@link InstrumentedExecutorService},
* {@link InstrumentedScheduledExecutorService}, and {@link InstrumentedThreadFactory}
* classes defined in this package. This class supports the following kinds of methods:
* <p>
* <ul>
* <li> Methods that create and return an {@link InstrumentedExecutorService}
* set up with commonly useful configuration settings.
* <li> Methods that create and return a {@link InstrumentedScheduledExecutorService}
* set up with commonly useful configuration settings.
* <li> Methods that create and return a "wrapped" ExecutorService, that
* disables reconfiguration by making implementation-specific methods
* inaccessible.
* <li> Methods that create and return a {@link InstrumentedThreadFactory}
* that sets newly created threads to a known state.
* </ul>
* </p>
*
* @see java.util.concurrent.Executors
*/
//线程执行服务
//创建线程池执行:newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool,newScheduledThreadPool
public final class InstrumentedExecutors {
/**
* Creates an instrumented thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* 创建线程池
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
* @see Executors#newFixedThreadPool(int)
*/
public static InstrumentedExecutorService newFixedThreadPool(int nThreads, MetricRegistry registry, String name) {
return new InstrumentedExecutorService(Executors.newFixedThreadPool(nThreads), registry, name);
}

/**
* Creates an instrumented thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link java.util.concurrent.ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
* @see Executors#newFixedThreadPool(int)
*/
public static InstrumentedExecutorService newFixedThreadPool(int nThreads, MetricRegistry registry) {
return new InstrumentedExecutorService(Executors.newFixedThreadPool(nThreads), registry);
}

/**
* Creates an instrumented thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
* @see Executors#newFixedThreadPool(int, ThreadFactory)
*/
public static InstrumentedExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory, MetricRegistry registry, String name) {
return new InstrumentedExecutorService(Executors.newFixedThreadPool(nThreads, threadFactory), registry, name);
}

/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
* @see Executors#newFixedThreadPool(int, ThreadFactory)
*/
public static InstrumentedExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory, MetricRegistry registry) {
return new InstrumentedExecutorService(Executors.newFixedThreadPool(nThreads, threadFactory), registry);
}

/**
* Creates an InstrumentedExecutor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return the newly created single-threaded Executor
* @see Executors#newSingleThreadExecutor()
*/
public static InstrumentedExecutorService newSingleThreadExecutor(MetricRegistry registry, String name) {
//单线程 - uses a single worker thread operating
return new InstrumentedExecutorService(Executors.newSingleThreadExecutor(), registry, name);
}

/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return the newly created single-threaded Executor
* @see Executors#newSingleThreadExecutor()
*/
public static InstrumentedExecutorService newSingleThreadExecutor(MetricRegistry registry) {
return new InstrumentedExecutorService(Executors.newSingleThreadExecutor(), registry);
}

/**
* Creates an InstrumentedExecutor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
* @see Executors#newSingleThreadExecutor(ThreadFactory)
*/
public static InstrumentedExecutorService newSingleThreadExecutor(ThreadFactory threadFactory, MetricRegistry registry, String name) {
return new InstrumentedExecutorService(Executors.newSingleThreadExecutor(threadFactory), registry, name);
}

/**
* Creates an InstrumentedExecutor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
* @see Executors#newSingleThreadExecutor(ThreadFactory)
*/
public static InstrumentedExecutorService newSingleThreadExecutor(ThreadFactory threadFactory, MetricRegistry registry) {
return new InstrumentedExecutorService(Executors.newSingleThreadExecutor(threadFactory), registry);
}

/**
* Creates an instrumented thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return the newly created thread pool
* @see Executors#newCachedThreadPool()
*/
public static InstrumentedExecutorService newCachedThreadPool(MetricRegistry registry, String name) {
return new InstrumentedExecutorService(Executors.newCachedThreadPool(), registry, name);
}

/**
* Creates an instrumented thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return the newly created thread pool
* @see Executors#newCachedThreadPool()
*/
public static InstrumentedExecutorService newCachedThreadPool(MetricRegistry registry) {
return new InstrumentedExecutorService(Executors.newCachedThreadPool(), registry);
}

/**
* Creates an instrumented thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
*
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @see Executors#newCachedThreadPool(ThreadFactory)
*/
public static InstrumentedExecutorService newCachedThreadPool(ThreadFactory threadFactory, MetricRegistry registry, String name) {
return new InstrumentedExecutorService(Executors.newCachedThreadPool(threadFactory), registry, name);
}

/**
* Creates an instrumented thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
*
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @see Executors#newCachedThreadPool(ThreadFactory)
*/
public static InstrumentedExecutorService newCachedThreadPool(ThreadFactory threadFactory, MetricRegistry registry) {
return new InstrumentedExecutorService(Executors.newCachedThreadPool(threadFactory), registry);
}

/**
* Creates a single-threaded instrumented executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return the newly created scheduled executor
* @see Executors#newSingleThreadScheduledExecutor()
*/
public static InstrumentedScheduledExecutorService newSingleThreadScheduledExecutor(MetricRegistry registry, String name) {
return new InstrumentedScheduledExecutorService(Executors.newSingleThreadScheduledExecutor(), registry, name);
}

/**
* Creates a single-threaded instrumented executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return the newly created scheduled executor
* @see Executors#newSingleThreadScheduledExecutor()
*/
public static InstrumentedScheduledExecutorService newSingleThreadScheduledExecutor(MetricRegistry registry) {
return new InstrumentedScheduledExecutorService(Executors.newSingleThreadScheduledExecutor(), registry);
}

/**
* Creates a single-threaded instrumented executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
*
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
* @see Executors#newSingleThreadExecutor(ThreadFactory)
*/
public static InstrumentedScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory, MetricRegistry registry, String name) {
return new InstrumentedScheduledExecutorService(Executors.newSingleThreadScheduledExecutor(threadFactory), registry, name);
}

/**
* Creates a single-threaded instrumented executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
*
* @param threadFactory the factory to use when creating new threads
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
* @see Executors#newSingleThreadExecutor(ThreadFactory)
*/
public static InstrumentedScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory, MetricRegistry registry) {
return new InstrumentedScheduledExecutorService(Executors.newSingleThreadScheduledExecutor(threadFactory), registry);
}

/**
* Creates an instrumented thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
*
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see Executors#newScheduledThreadPool(int)
*/
public static InstrumentedScheduledExecutorService newScheduledThreadPool(int corePoolSize, MetricRegistry registry, String name) {
return new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(corePoolSize), registry, name);
}

/**
* Creates an instrumented thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
*
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see Executors#newScheduledThreadPool(int)
*/
public static InstrumentedScheduledExecutorService newScheduledThreadPool(int corePoolSize, MetricRegistry registry) {
return new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(corePoolSize), registry);
}

/**
* Creates an instrumented thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
*
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
* @see Executors#newScheduledThreadPool(int, ThreadFactory)
*/
public static InstrumentedScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory, MetricRegistry registry, String name) {
return new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(corePoolSize, threadFactory), registry, name);
}

/**
* Creates an instrumented thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
*
* @param corePoolSize the number of threads to keep in the pool, even if they are idle
* @param threadFactory the factory to use when the executor creates a new thread
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
* @see Executors#newScheduledThreadPool(int, ThreadFactory)
*/
public static InstrumentedScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory, MetricRegistry registry) {
return new InstrumentedScheduledExecutorService(Executors.newScheduledThreadPool(corePoolSize, threadFactory), registry);
}

/**
* Returns an instrumented default thread factory used to create new threads.
* This factory creates all new threads used by an Executor in the
* same {@link ThreadGroup}. If there is a {@link
* java.lang.SecurityManager}, it uses the group of {@link
* System#getSecurityManager}, else the group of the thread
* invoking this {@code defaultThreadFactory} method. Each new
* thread is created as a non-daemon thread with priority set to
* the smaller of {@code Thread.NORM_PRIORITY} and the maximum
* priority permitted in the thread group. New threads have names
* accessible via {@link Thread#getName} of
* <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
* number of this factory, and <em>M</em> is the sequence number
* of the thread created by this factory.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return a thread factory
* @see Executors#defaultThreadFactory()
*/
public static InstrumentedThreadFactory defaultThreadFactory(MetricRegistry registry, String name) {
return new InstrumentedThreadFactory(Executors.defaultThreadFactory(), registry, name);
}

/**
* Returns an instrumented default thread factory used to create new threads.
* This factory creates all new threads used by an Executor in the
* same {@link ThreadGroup}. If there is a {@link
* java.lang.SecurityManager}, it uses the group of {@link
* System#getSecurityManager}, else the group of the thread
* invoking this {@code defaultThreadFactory} method. Each new
* thread is created as a non-daemon thread with priority set to
* the smaller of {@code Thread.NORM_PRIORITY} and the maximum
* priority permitted in the thread group. New threads have names
* accessible via {@link Thread#getName} of
* <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
* number of this factory, and <em>M</em> is the sequence number
* of the thread created by this factory.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return a thread factory
* @see Executors#defaultThreadFactory()
*/
public static InstrumentedThreadFactory defaultThreadFactory(MetricRegistry registry) {
return new InstrumentedThreadFactory(Executors.defaultThreadFactory(), registry);
}

/**
* Returns an instrumented thread factory used to create new threads that
* have the same permissions as the current thread.
* <p>
* This factory creates threads with the same settings as {@link
* Executors#defaultThreadFactory}, additionally setting the
* AccessControlContext and contextClassLoader of new threads to
* be the same as the thread invoking this
* {@code privilegedThreadFactory} method. A new
* {@code privilegedThreadFactory} can be created within an
* {@link java.security.AccessController#doPrivileged AccessController.doPrivileged}
* action setting the current thread's access control context to
* create threads with the selected permission settings holding
* within that action.
* </p>
* <p>Note that while tasks running within such threads will have
* the same access control and class loader settings as the
* current thread, they need not have the same {@link
* java.lang.ThreadLocal} or {@link
* java.lang.InheritableThreadLocal} values. If necessary,
* particular values of thread locals can be set or reset before
* any task runs in {@link ThreadPoolExecutor} subclasses using
* {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}.
* Also, if it is necessary to initialize worker threads to have
* the same InheritableThreadLocal settings as some other
* designated thread, you can create a custom ThreadFactory in
* which that thread waits for and services requests to create
* others that will inherit its values.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @param name the (metrics) name for this executor service, see {@link MetricRegistry#name(String, String...)}.
* @return a thread factory
* @throws java.security.AccessControlException if the current access control
* context does not have permission to both get and set context
* class loader
* @see Executors#privilegedThreadFactory()
*/
public static InstrumentedThreadFactory privilegedThreadFactory(MetricRegistry registry, String name) {
return new InstrumentedThreadFactory(Executors.privilegedThreadFactory(), registry, name);
}

/**
* Returns an instrumented thread factory used to create new threads that
* have the same permissions as the current thread.
* <p>
* This factory creates threads with the same settings as {@link
* Executors#defaultThreadFactory}, additionally setting the
* AccessControlContext and contextClassLoader of new threads to
* be the same as the thread invoking this
* {@code privilegedThreadFactory} method. A new
* {@code privilegedThreadFactory} can be created within an
* {@link java.security.AccessController#doPrivileged AccessController.doPrivileged}
* action setting the current thread's access control context to
* create threads with the selected permission settings holding
* within that action.
* </p>
* <p>Note that while tasks running within such threads will have
* the same access control and class loader settings as the
* current thread, they need not have the same {@link
* java.lang.ThreadLocal} or {@link
* java.lang.InheritableThreadLocal} values. If necessary,
* particular values of thread locals can be set or reset before
* any task runs in {@link ThreadPoolExecutor} subclasses using
* {@link ThreadPoolExecutor#beforeExecute(Thread, Runnable)}.
* Also, if it is necessary to initialize worker threads to have
* the same InheritableThreadLocal settings as some other
* designated thread, you can create a custom ThreadFactory in
* which that thread waits for and services requests to create
* others that will inherit its values.
*
* @param registry the {@link MetricRegistry} that will contain the metrics.
* @return a thread factory
* @throws java.security.AccessControlException if the current access control
* context does not have permission to both get and set context
* class loader
* @see Executors#privilegedThreadFactory()
*/
public static InstrumentedThreadFactory privilegedThreadFactory(MetricRegistry registry) {
return new InstrumentedThreadFactory(Executors.privilegedThreadFactory(), registry);
}

/**
* Cannot instantiate.
*/
private InstrumentedExecutors() {
}
}

InstrumentedThreadFactory.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package io.dropwizard.metrics;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
* A {@link ThreadFactory} that monitors the number of threads created, running and terminated.
* <p/>
* It will register the metrics using the given (or auto-generated) name as classifier, e.g:
* "your-thread-delegate.created", "your-thread-delegate.running", etc.
*/
//包装ThreadFactory
public class InstrumentedThreadFactory implements ThreadFactory {
private static final AtomicLong nameCounter = new AtomicLong();

private final ThreadFactory delegate;

/**
* 记录创建时数据
*/
private final Meter created;
/**
* 记录运行时数据
*/
private final Counter running;
/**
* 记录中断时数据
*/
private final Meter terminated;

/**
* Wraps a {@link ThreadFactory}, uses a default auto-generated name.
*
* @param delegate {@link ThreadFactory} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
*/
public InstrumentedThreadFactory(ThreadFactory delegate, MetricRegistry registry) {
this(delegate, registry, "instrumented-thread-delegate-" + nameCounter.incrementAndGet());
}

/**
* Wraps a {@link ThreadFactory} with an explicit name.
*
* @param delegate {@link ThreadFactory} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
* @param name name for this delegate.
*/
public InstrumentedThreadFactory(ThreadFactory delegate, MetricRegistry registry, String name) {
this.delegate = delegate;
this.created = registry.meter(MetricRegistry.name(name, "created"));
this.running = registry.counter(MetricRegistry.name(name, "running"));
this.terminated = registry.meter(MetricRegistry.name(name, "terminated"));
}

/**
* {@inheritDoc}
* 新建线程
*
*/
@Override
public Thread newThread(Runnable runnable) {
Runnable wrappedRunnable = new InstrumentedRunnable(runnable);
Thread thread = delegate.newThread(wrappedRunnable);
created.mark();
return thread;
}

//包装runnable
private class InstrumentedRunnable implements Runnable {
private final Runnable task;

InstrumentedRunnable(Runnable task) {
this.task = task;
}

@Override
public void run() {
running.inc();
try {
task.run();
} finally {
running.dec();
terminated.mark();
}
}
}
}

InstrumentedExecutorService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package io.dropwizard.metrics;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* An {@link ExecutorService} that monitors the number of tasks submitted, running,
* completed and also keeps a {@link Timer} for the task duration.
* <p/>
* It will register the metrics using the given (or auto-generated) name as classifier, e.g:
* "your-executor-service.submitted", "your-executor-service.running", etc.
*/
//包装executorService,写入监控对象
public class InstrumentedExecutorService implements ExecutorService {
//计数
private static final AtomicLong nameCounter = new AtomicLong();

private final ExecutorService delegate;
//检测类型的对象
/**
* 任务执行次数
*/
private final Meter submitted;
private final Counter running;
private final Meter completed;
private final Timer duration;
private final Meter rejected;

/**
* Wraps an {@link ExecutorService} uses an auto-generated default name.
*
* @param delegate {@link ExecutorService} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
*/
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry) {
this(delegate, registry, "instrumented-delegate-" + nameCounter.incrementAndGet());
}

/**
* Wraps an {@link ExecutorService} with an explicit name.
*
* @param delegate {@link ExecutorService} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
* @param name name for this executor service.
*/
public InstrumentedExecutorService(ExecutorService delegate, MetricRegistry registry, String name) {
this.delegate = delegate;
this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));
this.running = registry.counter(MetricRegistry.name(name, "running"));
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
this.duration = registry.timer(MetricRegistry.name(name, "duration"));
this.rejected = registry.meter(MetricRegistry.name(name, "rejected"));
}

/**
* {@inheritDoc}
*/
@Override
public void execute(Runnable runnable) {
submitted.mark();
try {
//执行包装任务
delegate.execute(new InstrumentedRunnable(runnable));
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* {@inheritDoc}
*/
@Override
public Future<?> submit(Runnable runnable) {
submitted.mark();
try {
return delegate.submit(new InstrumentedRunnable(runnable));
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> Future<T> submit(Runnable runnable, T result) {
submitted.mark();
try {
return delegate.submit(new InstrumentedRunnable(runnable), result);
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> Future<T> submit(Callable<T> task) {
submitted.mark();
try {
return delegate.submit(new InstrumentedCallable<T>(task));
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* {@inheritDoc}
* 执行所有任务
*/
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
submitted.mark(tasks.size());
Collection<? extends Callable<T>> instrumented = instrument(tasks);
try {
return delegate.invokeAll(instrumented);
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
submitted.mark(tasks.size());
Collection<? extends Callable<T>> instrumented = instrument(tasks);
try {
return delegate.invokeAll(instrumented, timeout, unit);
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws ExecutionException, InterruptedException {
submitted.mark(tasks.size());
Collection<? extends Callable<T>> instrumented = instrument(tasks);
try {
return delegate.invokeAny(instrumented);
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
submitted.mark(tasks.size());
//包装后的callable的列表
Collection<? extends Callable<T>> instrumented = instrument(tasks);
try {
return delegate.invokeAny(instrumented, timeout, unit);
} catch (RejectedExecutionException e) {
rejected.mark();
throw e;
}
}

/**
* 包装任务callable列表
* @param tasks
* @param <T>
* @return
*/
private <T> Collection<? extends Callable<T>> instrument(Collection<? extends Callable<T>> tasks) {
//好习惯。默认初始化大小
final List<InstrumentedCallable<T>> instrumented = new ArrayList<InstrumentedCallable<T>>(tasks.size());
for (Callable<T> task : tasks) {
instrumented.add(new InstrumentedCallable<T>(task));
}
return instrumented;
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException {
return delegate.awaitTermination(l, timeUnit);
}

//runnble
private class InstrumentedRunnable implements Runnable {
private final Runnable task;

InstrumentedRunnable(Runnable task) {
this.task = task;
}

@Override
public void run() {
running.inc();
final Timer.Context context = duration.time();
try {
task.run();
} finally {
context.stop();
running.dec();
completed.mark();
}
}
}

//覆写callable方法
//加入检测
private class InstrumentedCallable<T> implements Callable<T> {
private final Callable<T> callable;

InstrumentedCallable(Callable<T> callable) {
this.callable = callable;
}

@Override
public T call() throws Exception {
running.inc();
final Timer.Context context = duration.time();
try {
return callable.call();
} finally {
context.stop();
running.dec();
completed.mark();
}
}
}
}

InstrumentedScheduledExecutorService.java

包装ScheduledExecutorService类,嵌入监控对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package io.dropwizard.metrics;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**
* An {@link ScheduledExecutorService} that monitors the number of tasks submitted, running,
* completed and also keeps a {@link Timer} for the task duration.
* <p/>
* It will register the metrics using the given (or auto-generated) name as classifier, e.g:
* "your-executor-service.submitted", "your-executor-service.running", etc.
*/
public class InstrumentedScheduledExecutorService implements ScheduledExecutorService {
private static final AtomicLong nameCounter = new AtomicLong();

private final ScheduledExecutorService delegate;

private final Meter submitted;
private final Counter running;
private final Meter completed;
private final Timer duration;
//1次
private final Meter scheduledOnce;
//重复
private final Meter scheduledRepetitively;
private final Counter scheduledOverrun;
private final Histogram percentOfPeriod;

/**
* Wraps an {@link ScheduledExecutorService} uses an auto-generated default name.
*
* @param delegate {@link ScheduledExecutorService} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
*/
public InstrumentedScheduledExecutorService(ScheduledExecutorService delegate, MetricRegistry registry) {
this(delegate, registry, "instrumented-scheduled-executor-service-" + nameCounter.incrementAndGet());
}

/**
* Wraps an {@link ScheduledExecutorService} with an explicit name.
*
* @param delegate {@link ScheduledExecutorService} to wrap.
* @param registry {@link MetricRegistry} that will contain the metrics.
* @param name name for this executor service.
*/
public InstrumentedScheduledExecutorService(ScheduledExecutorService delegate, MetricRegistry registry, String name) {
this.delegate = delegate;

this.submitted = registry.meter(MetricRegistry.name(name, "submitted"));

this.running = registry.counter(MetricRegistry.name(name, "running"));
this.completed = registry.meter(MetricRegistry.name(name, "completed"));
this.duration = registry.timer(MetricRegistry.name(name, "duration"));

this.scheduledOnce = registry.meter(MetricRegistry.name(name, "scheduled.once"));
this.scheduledRepetitively = registry.meter(MetricRegistry.name(name, "scheduled.repetitively"));
this.scheduledOverrun = registry.counter(MetricRegistry.name(name, "scheduled.overrun"));
this.percentOfPeriod = registry.histogram(MetricRegistry.name(name, "scheduled.percent-of-period"));
}

/**
* {@inheritDoc}
*/
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
scheduledOnce.mark();
return delegate.schedule(new InstrumentedRunnable(command), delay, unit);
}

/**
* {@inheritDoc}
*/
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
scheduledOnce.mark();
return delegate.schedule(new InstrumentedCallable<V>(callable), delay, unit);
}

/**
* {@inheritDoc}
*/
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
scheduledRepetitively.mark();
return delegate.scheduleAtFixedRate(new InstrumentedPeriodicRunnable(command, period, unit), initialDelay, period, unit);
}

/**
* {@inheritDoc}
*/
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
scheduledRepetitively.mark();
return delegate.scheduleAtFixedRate(new InstrumentedRunnable(command), initialDelay, delay, unit);
}

/**
* {@inheritDoc}
*/
@Override
public void shutdown() {
delegate.shutdown();
}

/**
* {@inheritDoc}
*/
@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

/**
* {@inheritDoc}
*/
@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

/**
* {@inheritDoc}
*/
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Future<T> submit(Callable<T> task) {
submitted.mark();
return delegate.submit(new InstrumentedCallable<T>(task));
}

/**
* {@inheritDoc}
*/
@Override
public <T> Future<T> submit(Runnable task, T result) {
submitted.mark();
return delegate.submit(new InstrumentedRunnable(task), result);
}

/**
* {@inheritDoc}
*/
@Override
public Future<?> submit(Runnable task) {
submitted.mark();
return delegate.submit(new InstrumentedRunnable(task));
}

/**
* {@inheritDoc}
*/
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
submitted.mark(tasks.size());
Collection<? extends Callable<T>> instrumented = instrument(tasks);
return delegate.invokeAll(instrumented);
}

/**
* {@inheritDoc}
*/
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
submitted.mark(tasks.size());
Collection<? extends Callable<T>> instrumented = instrument(tasks);
return delegate.invokeAll(instrumented, timeout, unit);
}

/**
* {@inheritDoc}
*/
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
submitted.mark(tasks.size());
Collection<? extends Callable<T>> instrumented = instrument(tasks);
return delegate.invokeAny(instrumented);
}

/**
* {@inheritDoc}
*/
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
submitted.mark(tasks.size());
Collection<? extends Callable<T>> instrumented = instrument(tasks);
return delegate.invokeAny(instrumented, timeout, unit);
}

private <T> Collection<? extends Callable<T>> instrument(Collection<? extends Callable<T>> tasks) {
final List<InstrumentedCallable<T>> instrumented = new ArrayList<InstrumentedCallable<T>>(tasks.size());
for (Callable<T> task : tasks) {
instrumented.add(new InstrumentedCallable(task));
}
return instrumented;
}

/**
* {@inheritDoc}
*/
@Override
public void execute(Runnable command) {
submitted.mark();
delegate.execute(new InstrumentedRunnable(command));
}

//-----------------内部类


private class InstrumentedRunnable implements Runnable {
private final Runnable command;

InstrumentedRunnable(Runnable command) {
this.command = command;
}

@Override
public void run() {
running.inc();
final Timer.Context context = duration.time();
try {
command.run();
} finally {
context.stop();
running.dec();
completed.mark();
}
}
}

private class InstrumentedPeriodicRunnable implements Runnable {
private final Runnable command;
private final long periodInNanos;

InstrumentedPeriodicRunnable(Runnable command, long period, TimeUnit unit) {
this.command = command;
this.periodInNanos = unit.toNanos(period);
}

@Override
public void run() {
running.inc();
final Timer.Context context = duration.time();
try {
command.run();
} finally {
final long elapsed = context.stop();
running.dec();
completed.mark();
if (elapsed > periodInNanos) {
scheduledOverrun.inc();
}
percentOfPeriod.update((100L * elapsed) / periodInNanos);
}
}
}

private class InstrumentedCallable<T> implements Callable<T> {
private final Callable<T> task;

InstrumentedCallable(Callable<T> task) {
this.task = task;
}

@Override
public T call() throws Exception {
running.inc();
final Timer.Context context = duration.time();
try {
return task.call();
} finally {
context.stop();
running.dec();
completed.mark();
}
}
}
}

metrics-healthchecks

HealthCheck.java

健康监测抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package io.dropwizard.metrics.health;

/**
* A health check for a component of your application.
*
* 应用健康监测抽象类
*/
public abstract class HealthCheck {
/**
* The result of a {@link HealthCheck} being run. It can be healthy (with an optional message)
* or unhealthy (with either an error message or a thrown exception).
*
* 检测结果类:健康,非健康
*
*/
public static class Result {
private static final Result HEALTHY = new Result(true, null, null);
private static final int PRIME = 31;

/**
* Returns a healthy {@link Result} with no additional message.
*
* @return a healthy {@link Result} with no additional message
*/
public static Result healthy() {
return HEALTHY;
}

/**
* Returns a healthy {@link Result} with an additional message.
*
* @param message an informative message
* @return a healthy {@link Result} with an additional message
*/
public static Result healthy(String message) {
return new Result(true, message, null);
}

/**
* Returns a healthy {@link Result} with a formatted message.
* <p/>
* Message formatting follows the same rules as {@link String#format(String, Object...)}.
*
* @param message a message format
* @param args the arguments apply to the message format
* @return a healthy {@link Result} with an additional message
* @see String#format(String, Object...)
*/
public static Result healthy(String message, Object... args) {
return healthy(String.format(message, args));
}

/**
* Returns an unhealthy {@link Result} with the given message.
*
* @param message an informative message describing how the health check failed
* @return an unhealthy {@link Result} with the given message
*/
public static Result unhealthy(String message) {
return new Result(false, message, null);
}

/**
* Returns an unhealthy {@link Result} with a formatted message.
* <p/>
* Message formatting follows the same rules as {@link String#format(String, Object...)}.
*
* @param message a message format
* @param args the arguments apply to the message format
* @return an unhealthy {@link Result} with an additional message
* @see String#format(String, Object...)
*/
public static Result unhealthy(String message, Object... args) {
return unhealthy(String.format(message, args));
}

/**
* Returns an unhealthy {@link Result} with the given error.
*
* @param error an exception thrown during the health check
* @return an unhealthy {@link Result} with the given error
*/
public static Result unhealthy(Throwable error) {
return new Result(false, error.getMessage(), error);
}

//是否健康
private final boolean healthy;
//内容
private final String message;
private final Throwable error;

protected Result(boolean isHealthy, String message, Throwable error) {
this.healthy = isHealthy;
this.message = message;
this.error = error;
}

/**
* Returns {@code true} if the result indicates the component is healthy; {@code false}
* otherwise.
*
* @return {@code true} if the result indicates the component is healthy
*/
public boolean isHealthy() {
return healthy;
}

/**
* Returns any additional message for the result, or {@code null} if the result has no
* message.
*
* @return any additional message for the result, or {@code null}
*/
public String getMessage() {
return message;
}

/**
* Returns any exception for the result, or {@code null} if the result has no exception.
*
* @return any exception for the result, or {@code null}
*/
public Throwable getError() {
return error;
}

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
final Result result = (Result) o;
return healthy == result.healthy &&
!(error != null ? !error.equals(result.error) : result.error != null) &&
!(message != null ? !message.equals(result.message) : result.message != null);
}

@Override
public int hashCode() {
int result = (healthy ? 1 : 0);
result = PRIME * result + (message != null ? message.hashCode() : 0);
result = PRIME * result + (error != null ? error.hashCode() : 0);
return result;
}

@Override
public String toString() {
final StringBuilder builder = new StringBuilder("Result{isHealthy=");
builder.append(healthy);
if (message != null) {
builder.append(", message=").append(message);
}
if (error != null) {
builder.append(", error=").append(error);
}
builder.append('}');
return builder.toString();
}
}

/**
* Perform a check of the application component.
*
* 检测health
*
* @return if the component is healthy, a healthy {@link Result}; otherwise, an unhealthy {@link
* Result} with a descriptive error message or exception
* @throws Exception if there is an unhandled error during the health check; this will result in
* a failed health check
*/
protected abstract Result check() throws Exception;

/**
* Executes the health check, catching and handling any exceptions raised by {@link #check()}.
* 线程执行 health check
*
* @return if the component is healthy, a healthy {@link Result}; otherwise, an unhealthy {@link
* Result} with a descriptive error message or exception
*/
public Result execute() {
try {
return check();
} catch (Exception e) {
return Result.unhealthy(e);
}
}
}

HealthCheckRegistry.java

healthCheck的注册类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package io.dropwizard.metrics.health;

import static io.dropwizard.metrics.health.HealthCheck.Result;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;

/**
* A registry for health checks.
*
* healthCheck的注册类
* 把HealthCheck注册到Map中
*
*/
public class HealthCheckRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(HealthCheckRegistry.class);

//health 的map
private final ConcurrentMap<String, HealthCheck> healthChecks;

/**
* Creates a new {@link HealthCheckRegistry}.
*/
public HealthCheckRegistry() {
this.healthChecks = new ConcurrentHashMap<String, HealthCheck>();
}

/**
* Registers an application {@link HealthCheck}.
* 存储到map
*
* @param name the name of the health check
* @param healthCheck the {@link HealthCheck} instance
*/
public void register(String name, HealthCheck healthCheck) {
healthChecks.putIfAbsent(name, healthCheck);
}

/**
* Unregisters the application {@link HealthCheck} with the given name.
*
* @param name the name of the {@link HealthCheck} instance
*/
public void unregister(String name) {
healthChecks.remove(name);
}

/**
* Returns a set of the names of all registered health checks.
*
* 返回注册healthCheck的name
* 且是不可修改的set
*
* @return the names of all registered health checks
*/
public SortedSet<String> getNames() {
return Collections.unmodifiableSortedSet(new TreeSet<String>(healthChecks.keySet()));
}

/**
* Runs the health check with the given name.
* 根据名字执行healthCheck 检测
*
* @param name the health check's name
* @return the result of the health check
* @throws NoSuchElementException if there is no health check with the given name
*/
public HealthCheck.Result runHealthCheck(String name) throws NoSuchElementException {
final HealthCheck healthCheck = healthChecks.get(name);
if (healthCheck == null) {
throw new NoSuchElementException("No health check named " + name + " exists");
}
return healthCheck.execute();
}

/**
* Runs the registered health checks and returns a map of the results.
*
* 运行health 检测 ,返回执行结果
* 返回不可修改的map
*
* @return a map of the health check results
*/
public SortedMap<String, HealthCheck.Result> runHealthChecks() {
final SortedMap<String, HealthCheck.Result> results = new TreeMap<String, HealthCheck.Result>();
//检测结果
for (Map.Entry<String, HealthCheck> entry : healthChecks.entrySet()) {
//执行任务
final Result result = entry.getValue().execute();
results.put(entry.getKey(), result);
}
//返回不可修改的执行结果
return Collections.unmodifiableSortedMap(results);
}

/**
*
* 线程池执行健康监测
* 阻塞(Callable)的返回执行结果
*
* Runs the registered health checks in parallel and returns a map of the results.
* @param executor object to launch and track health checks progress
* @return a map of the health check results
*/
public SortedMap<String, HealthCheck.Result> runHealthChecks(ExecutorService executor) {
final Map<String, Future<HealthCheck.Result>> futures = new HashMap<String, Future<Result>>();

for (final Map.Entry<String, HealthCheck> entry : healthChecks.entrySet()) {
//线程池中执行任务
futures.put(entry.getKey(), executor.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
return entry.getValue().execute();
}
}));
}

final SortedMap<String, HealthCheck.Result> results = new TreeMap<String, HealthCheck.Result>();
for (Map.Entry<String, Future<Result>> entry : futures.entrySet()) {
try {
results.put(entry.getKey(), entry.getValue().get());
} catch (Exception e) {
LOGGER.warn("Error executing health check {}", entry.getKey(), e);
results.put(entry.getKey(), HealthCheck.Result.unhealthy(e));
}
}
return Collections.unmodifiableSortedMap(results);
}
}

SharedHealthCheckRegistries.java

HealthCheckRegistry的全局Map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package io.dropwizard.metrics.health;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* A map of shared, named health registries.
* HealthCheckRegistry的全局Map
*
*/
public class SharedHealthCheckRegistries {
/**
* 全局
*/
private static final ConcurrentMap<String, HealthCheckRegistry> REGISTRIES = new ConcurrentHashMap<String, HealthCheckRegistry>();

private SharedHealthCheckRegistries() { /* singleton */ }

public static void clear() {
REGISTRIES.clear();
}

public static Set<String> names() {
return REGISTRIES.keySet();
}

public static void remove(String key) {
REGISTRIES.remove(key);
}

/**
* 根据名字新增HealthCheckRegistry对象
* @param name
* @param registry
* @return
*/
public static HealthCheckRegistry add(String name, HealthCheckRegistry registry) {
return REGISTRIES.putIfAbsent(name, registry);
}

/**
* 从缓存中获取HealthCheckRegistry对象,如果不存在,则新增HealthCheckRegistry对象
* @param name
* @return
*/
public static HealthCheckRegistry getOrCreate(String name) {
final HealthCheckRegistry existing = REGISTRIES.get(name);
if (existing == null) {
final HealthCheckRegistry created = new HealthCheckRegistry();
final HealthCheckRegistry raced = add(name, created);
if (raced == null) {
return created;
}
return raced;
}
return existing;
}
}

ThreadDeadlockHealthCheck.java

健康线程死锁检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package io.dropwizard.metrics.health.jvm;

import io.dropwizard.metrics.jvm.ThreadDeadlockDetector;

import io.dropwizard.metrics.health.HealthCheck;

import java.util.Set;

/**
* A health check which returns healthy if no threads are deadlocked.
* 健康线程死锁检测
*
*/
public class ThreadDeadlockHealthCheck extends HealthCheck {
private final ThreadDeadlockDetector detector;

/**
* Creates a new health check.
*/
public ThreadDeadlockHealthCheck() {
this(new ThreadDeadlockDetector());
}

/**
* Creates a new health check with the given detector.
*
* @param detector a thread deadlock detector
*/
public ThreadDeadlockHealthCheck(ThreadDeadlockDetector detector) {
this.detector = detector;
}

@Override
protected Result check() throws Exception {
final Set<String> threads = detector.getDeadlockedThreads();
if (threads.isEmpty()) {
return Result.healthy();
}
return Result.unhealthy(threads.toString());
}
}

metrics-httpclient

HttpClientMetricNameStrategies.java

创建HttpClientMetricNameStrategy对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package io.dropwizard.metrics.httpclient;

import static io.dropwizard.metrics.MetricRegistry.name;

import org.apache.http.HttpRequest;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpRequestWrapper;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;

import io.dropwizard.metrics.MetricName;

import java.net.URI;
import java.net.URISyntaxException;

/**
* httpclient名字策略组成
* 创建HttpClientMetricNameStrategy对象
*/
public class HttpClientMetricNameStrategies {

/**
* method
*/
public static final HttpClientMetricNameStrategy METHOD_ONLY =
new HttpClientMetricNameStrategy() {
@Override
public MetricName getNameFor(String name, HttpRequest request) {
return name(HttpClient.class,
name,
methodNameString(request));
}
};

/**
* host_and_method
*/
public static final HttpClientMetricNameStrategy HOST_AND_METHOD =
new HttpClientMetricNameStrategy() {
@Override
public MetricName getNameFor(String name, HttpRequest request) {
return name(HttpClient.class,
name,
requestURI(request).getHost(),
methodNameString(request));
}
};
/**
* queryless_url_and_method
*/
public static final HttpClientMetricNameStrategy QUERYLESS_URL_AND_METHOD =
new HttpClientMetricNameStrategy() {
@Override
public MetricName getNameFor(String name, HttpRequest request) {
try {
final URIBuilder url = new URIBuilder(requestURI(request));
return name(HttpClient.class,
name,
url.removeQuery().build().toString(),
methodNameString(request));
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
};



private static String methodNameString(HttpRequest request) {
return request.getRequestLine().getMethod().toLowerCase() + "-requests";
}


private static URI requestURI(HttpRequest request) {
if (request instanceof HttpRequestWrapper)
return requestURI(((HttpRequestWrapper) request).getOriginal());

return (request instanceof HttpUriRequest) ?
((HttpUriRequest) request).getURI() :
URI.create(request.getRequestLine().getUri());
}
}

HttpClientMetricNameStrategy.java

接口:创建httpclient的MetricName

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package io.dropwizard.metrics.httpclient;

import static io.dropwizard.metrics.MetricRegistry.name;

import org.apache.http.HttpRequest;
import org.apache.http.client.HttpClient;

import io.dropwizard.metrics.MetricName;

/**
* 监测,名字策略
*/
public interface HttpClientMetricNameStrategy {

MetricName getNameFor(String name, HttpRequest request);

//name + exception组成名字
default MetricName getNameFor(String name, Exception exception) {
return name(HttpClient.class,
name,
exception.getClass().getSimpleName());
}
}

InstrumentedHttpClientConnectionManager.java

监控http 连接工具,继承http 连接池
创建Gauge的4种连接监控注册对象:available-connections,leased-connections,max-connections,pending-connections

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package io.dropwizard.metrics.httpclient;

import static io.dropwizard.metrics.MetricRegistry.name;

import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.*;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.conn.SystemDefaultDnsResolver;

import io.dropwizard.metrics.Gauge;
import io.dropwizard.metrics.MetricRegistry;

import java.util.concurrent.TimeUnit;

/**
* A {@link HttpClientConnectionManager} which monitors the number of open connections.
* 监控http 连接工具,继承http 连接池
* 创建Gauge的4种连接监控注册对象:available-connections,leased-connections,max-connections,pending-connections
*/
public class InstrumentedHttpClientConnectionManager extends PoolingHttpClientConnectionManager {


/**
* 获取http默认策略
* @return
*/
protected static Registry<ConnectionSocketFactory> getDefaultRegistry() {
return RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", SSLConnectionSocketFactory.getSocketFactory())
.build();
}

/**
* 监测注册对象
*/
private final MetricRegistry metricsRegistry;
private final String name;

/**
* 构造方法
* @param metricRegistry
*/
public InstrumentedHttpClientConnectionManager(MetricRegistry metricRegistry) {
this(metricRegistry, getDefaultRegistry());
}

/**
* 构造方法
* @param metricsRegistry
* @param socketFactoryRegistry
*/
public InstrumentedHttpClientConnectionManager(MetricRegistry metricsRegistry,
Registry<ConnectionSocketFactory> socketFactoryRegistry) {
this(metricsRegistry, socketFactoryRegistry, -1, TimeUnit.MILLISECONDS);
}

/**
* 构造方法
* @param metricsRegistry
* @param socketFactoryRegistry
* @param connTTL
* @param connTTLTimeUnit
*/
public InstrumentedHttpClientConnectionManager(MetricRegistry metricsRegistry,
Registry<ConnectionSocketFactory> socketFactoryRegistry,
long connTTL,
TimeUnit connTTLTimeUnit) {
this(metricsRegistry, socketFactoryRegistry, null, null, SystemDefaultDnsResolver.INSTANCE, connTTL, connTTLTimeUnit, null);
}

/**
* 构造方法,注册http,4种连接监控注册对象:available-connections,leased-connections,max-connections,pending-connections
* @param metricsRegistry
* @param socketFactoryRegistry
* @param connFactory
* @param schemePortResolver
* @param dnsResolver
* @param connTTL
* @param connTTLTimeUnit
* @param name
*/
public InstrumentedHttpClientConnectionManager(MetricRegistry metricsRegistry,
Registry<ConnectionSocketFactory> socketFactoryRegistry,
HttpConnectionFactory<HttpRoute,ManagedHttpClientConnection> connFactory,
SchemePortResolver schemePortResolver,
DnsResolver dnsResolver,
long connTTL,
TimeUnit connTTLTimeUnit,
String name) {
//调用http里的方法
super(socketFactoryRegistry, connFactory, schemePortResolver, dnsResolver, connTTL, connTTLTimeUnit);
//监测注册对象
this.metricsRegistry = metricsRegistry;
this.name = name;
//可以获得连接
metricsRegistry.register(name(HttpClientConnectionManager.class, name, "available-connections"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
// this acquires a lock on the connection pool; remove if contention sucks
return getTotalStats().getAvailable();
}
});
//专线
metricsRegistry.register(name(HttpClientConnectionManager.class, name, "leased-connections"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
// this acquires a lock on the connection pool; remove if contention sucks
return getTotalStats().getLeased();
}
});
//最大连接
metricsRegistry.register(name(HttpClientConnectionManager.class, name, "max-connections"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
// this acquires a lock on the connection pool; remove if contention sucks
return getTotalStats().getMax();
}
});
//待定
metricsRegistry.register(name(HttpClientConnectionManager.class, name, "pending-connections"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
// this acquires a lock on the connection pool; remove if contention sucks
return getTotalStats().getPending();
}
});
}

/**
* 删除所有连接
*/
@Override
public void shutdown() {
super.shutdown();
metricsRegistry.remove(name(HttpClientConnectionManager.class, name, "available-connections"));
metricsRegistry.remove(name(HttpClientConnectionManager.class, name, "leased-connections"));
metricsRegistry.remove(name(HttpClientConnectionManager.class, name, "max-connections"));
metricsRegistry.remove(name(HttpClientConnectionManager.class, name, "pending-connections"));
}
}

InstrumentedHttpRequestExecutor.java

httpRequest执行类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package io.dropwizard.metrics.httpclient;

import org.apache.http.HttpClientConnection;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpRequestExecutor;

import io.dropwizard.metrics.Meter;
import io.dropwizard.metrics.MetricRegistry;
import io.dropwizard.metrics.Timer;

import java.io.IOException;

/**
*
* httpRequest执行类
*
*/
public class InstrumentedHttpRequestExecutor extends HttpRequestExecutor {
/**
* 监测注册对象
*/
private final MetricRegistry registry;
/**
* 监测名字策略
*/
private final HttpClientMetricNameStrategy metricNameStrategy;
private final String name;

public InstrumentedHttpRequestExecutor(MetricRegistry registry,
HttpClientMetricNameStrategy metricNameStrategy) {
this(registry, metricNameStrategy, null);
}

public InstrumentedHttpRequestExecutor(MetricRegistry registry,
HttpClientMetricNameStrategy metricNameStrategy,
String name) {
this(registry, metricNameStrategy, name, HttpRequestExecutor.DEFAULT_WAIT_FOR_CONTINUE);
}

public InstrumentedHttpRequestExecutor(MetricRegistry registry,
HttpClientMetricNameStrategy metricNameStrategy,
String name,
int waitForContinue) {
super(waitForContinue);
this.registry = registry;
this.name = name;
this.metricNameStrategy = metricNameStrategy;
}

/**
* 覆写父类的方法,新增:timer对象。
* 对执行方法进行时间监控。
* @param request
* @param conn
* @param context
* @return
* @throws HttpException
* @throws IOException
*/
@Override
public HttpResponse execute(HttpRequest request, HttpClientConnection conn, HttpContext context) throws HttpException, IOException {
//时间上下文
final Timer.Context timerContext = timer(request).time();
try {
//调用httpclient的执行请求
return super.execute(request, conn, context);
} catch (HttpException | IOException e) {
meter(e).mark();
throw e;
} finally {
//时间停止
timerContext.stop();
}
}

/**
* 注册execute执行时间
* @param request
* @return
*/
private Timer timer(HttpRequest request) {
return registry.timer(metricNameStrategy.getNameFor(name, request));
}

/**
* 统计异常处理次数
* @param e
* @return
*/
private Meter meter(Exception e) {
return registry.meter(metricNameStrategy.getNameFor(name, e));
}
}

InstrumentedHttpClients.java

httpClient监测工具,创建监测的HttpClientBuilder对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package io.dropwizard.metrics.httpclient;

import static io.dropwizard.metrics.httpclient.HttpClientMetricNameStrategies.METHOD_ONLY;

import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;

import io.dropwizard.metrics.MetricRegistry;

/**
* httpClient监测工具
* 创建监测的HttpClientBuilder对象
*/
public class InstrumentedHttpClients {
private InstrumentedHttpClients() {
super();
}

public static CloseableHttpClient createDefault(MetricRegistry metricRegistry) {
return createDefault(metricRegistry, METHOD_ONLY);
}

public static CloseableHttpClient createDefault(MetricRegistry metricRegistry,
HttpClientMetricNameStrategy metricNameStrategy) {
return custom(metricRegistry, metricNameStrategy).build();
}

public static HttpClientBuilder custom(MetricRegistry metricRegistry) {
return custom(metricRegistry, METHOD_ONLY);
}

/**
* 继承HttpRequestExecutor的监测工具类:InstrumentedHttpRequestExecutor
* 继承PoolingHttpClientConnectionManager的连接池工具类,InstrumentedHttpClientConnectionManager
* 创建HttpClientBuilder对象
*
* @param metricRegistry
* @param metricNameStrategy
* @return
*/
public static HttpClientBuilder custom(MetricRegistry metricRegistry,
HttpClientMetricNameStrategy metricNameStrategy) {
return HttpClientBuilder.create()
.setRequestExecutor(new InstrumentedHttpRequestExecutor(metricRegistry, metricNameStrategy))
.setConnectionManager(new InstrumentedHttpClientConnectionManager(metricRegistry));
}
}

metrics-jvm

BufferPoolMetricSet.java

通过:MBeanServer,JVM指标统计,映射内存图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package io.dropwizard.metrics.jvm;

import static io.dropwizard.metrics.MetricRegistry.name;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.dropwizard.metrics.JmxAttributeGauge;
import io.dropwizard.metrics.Metric;
import io.dropwizard.metrics.MetricName;
import io.dropwizard.metrics.MetricSet;

import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/**
* A set of gauges for the count, usage, and capacity of the JVM's direct and mapped buffer pools.
* 通过:MBeanServer,JVM指标统计,映射内存图
*
* <p>
* These JMX objects are only available on Java 7 and above.
*/
public class BufferPoolMetricSet implements MetricSet {
private static final Logger LOGGER = LoggerFactory.getLogger(BufferPoolMetricSet.class);

private static final String[] ATTRIBUTES = {"Count", "MemoryUsed", "TotalCapacity"};
private static final String[] NAMES = {"count", "used", "capacity"};
private static final String[] POOLS = {"direct", "mapped"};

private final MBeanServer mBeanServer;

public BufferPoolMetricSet(MBeanServer mBeanServer) {
this.mBeanServer = mBeanServer;
}

/**
* 通过MBeanServer获取JVM缓存池数据,设置gauges,存入不可变的Map
* @return
*/
@Override
public Map<MetricName, Metric> getMetrics() {
final Map<MetricName, Metric> gauges = new HashMap<MetricName, Metric>();
for (String pool : POOLS) {
for (int i = 0; i < ATTRIBUTES.length; i++) {
final String attribute = ATTRIBUTES[i];
final String name = NAMES[i];
try {
final ObjectName on = new ObjectName("java.nio:type=BufferPool,name=" + pool);
mBeanServer.getMBeanInfo(on);
gauges.put(name(pool, name),
new JmxAttributeGauge(mBeanServer, on, attribute));
} catch (JMException ignored) {
LOGGER.debug("Unable to load buffer pool MBeans, possibly running on Java 6");
}
}
}
return Collections.unmodifiableMap(gauges);
}
}

CachedThreadStatesGaugeSet.java

使用ThreadMXBean创建指定时间内的缓存信息,存入gauges中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package io.dropwizard.metrics.jvm;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.TimeUnit;

import io.dropwizard.metrics.CachedGauge;

/**
* A variation of ThreadStatesGaugeSet that caches the ThreadInfo[] objects for
* a given interval.
* 使用ThreadMXBean创建指定时间内的缓存信息,存入gauges中
*
*/
public class CachedThreadStatesGaugeSet extends ThreadStatesGaugeSet {
//缓存
private final CachedGauge<ThreadInfo[]> threadInfo;

/**
* 使用ThreadMXBean创建指定时间内的缓存信息,存入gauges中
*
* Creates a new set of gauges using the given MXBean and detector.
* Caches the information for the given interval and time unit.
*
* @param threadMXBean a thread MXBean
* @param deadlockDetector a deadlock detector
* @param interval cache interval
* @param unit cache interval time unit
*/
public CachedThreadStatesGaugeSet(final ThreadMXBean threadMXBean, ThreadDeadlockDetector deadlockDetector,
long interval, TimeUnit unit) {
super(threadMXBean, deadlockDetector);
//所有线程信息对象
threadInfo = new CachedGauge<ThreadInfo[]>(interval, unit) {
@Override
protected ThreadInfo[] loadValue() {
return CachedThreadStatesGaugeSet.super.getThreadInfo();
}
};
}

/**
* Creates a new set of gauges using the default MXBeans.
* Caches the information for the given interval and time unit.
* @param interval cache interval
* @param unit cache interval time unit
*/
public CachedThreadStatesGaugeSet(long interval, TimeUnit unit) {
this(ManagementFactory.getThreadMXBean(), new ThreadDeadlockDetector(), interval, unit);
}

/**
* 获取线程信息数组
* @return
*/
@Override
ThreadInfo[] getThreadInfo() {
return threadInfo.getValue();
}

}

ClassLoadingGaugeSet.java

JVM classloader 使用率,class load 数目和 unload 数目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package io.dropwizard.metrics.jvm;

import io.dropwizard.metrics.Gauge;
import io.dropwizard.metrics.Metric;
import io.dropwizard.metrics.MetricName;
import io.dropwizard.metrics.MetricSet;

import java.lang.management.ClassLoadingMXBean;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.Map;

/**
* A set of gauges for JVM classloader usage.
*
* JVM classloader 使用率
* class load 数目和 unload 数目
*
*/
public class ClassLoadingGaugeSet implements MetricSet {

private final ClassLoadingMXBean mxBean;

public ClassLoadingGaugeSet() {
this(ManagementFactory.getClassLoadingMXBean());
}

public ClassLoadingGaugeSet(ClassLoadingMXBean mxBean) {
this.mxBean = mxBean;
}

/**
* class load 数目和 unload 数目
* @return
*/
@Override
public Map<MetricName, Metric> getMetrics() {
final Map<MetricName, Metric> gauges = new HashMap<MetricName, Metric>();

gauges.put(MetricName.build("loaded"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getTotalLoadedClassCount();
}
});

gauges.put(MetricName.build("unloaded"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getUnloadedClassCount();
}
});

return gauges;
}
}

FileDescriptorRatioGauge.java

通过OperatingSystemMXBean,文件描述使用比率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package io.dropwizard.metrics.jvm;

import io.dropwizard.metrics.RatioGauge;

import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* A gauge for the ratio of used to total file descriptors.
* 通过OperatingSystemMXBean,文件描述使用比率
*
*/
public class FileDescriptorRatioGauge extends RatioGauge {
private final OperatingSystemMXBean os;

/**
* Creates a new gauge using the platform OS bean.
*/
public FileDescriptorRatioGauge() {
this(ManagementFactory.getOperatingSystemMXBean());
}

/**
* Creates a new gauge using the given OS bean.
*
* @param os an {@link OperatingSystemMXBean}
*/
public FileDescriptorRatioGauge(OperatingSystemMXBean os) {
this.os = os;
}

@Override
protected Ratio getRatio() {
try {
//打开文件数目,最大文件数目
return Ratio.of(invoke("getOpenFileDescriptorCount"), invoke("getMaxFileDescriptorCount"));
} catch (NoSuchMethodException e) {
return Ratio.of(Double.NaN, Double.NaN);
} catch (IllegalAccessException e) {
return Ratio.of(Double.NaN, Double.NaN);
} catch (InvocationTargetException e) {
return Ratio.of(Double.NaN, Double.NaN);
}
}

private long invoke(String name) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
final Method method = os.getClass().getDeclaredMethod(name);
method.setAccessible(true);
return (Long) method.invoke(os);
}
}

GarbageCollectorMetricSet.java

通过GarbageCollectorMXBean实现运行时垃圾收集:收集数目,收集时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package io.dropwizard.metrics.jvm;

import static io.dropwizard.metrics.MetricRegistry.name;

import io.dropwizard.metrics.Gauge;
import io.dropwizard.metrics.Metric;
import io.dropwizard.metrics.MetricName;
import io.dropwizard.metrics.MetricSet;

import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.util.*;
import java.util.regex.Pattern;

/**
* A set of gauges for the counts and elapsed times of garbage collections.
*
* 通过GarbageCollectorMXBean实现运行时垃圾收集:收集数目,收集时间
*
*/
public class GarbageCollectorMetricSet implements MetricSet {
//空格
private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");

private final List<GarbageCollectorMXBean> garbageCollectors;

/**
* Creates a new set of gauges for all discoverable garbage collectors.
*/
public GarbageCollectorMetricSet() {
this(ManagementFactory.getGarbageCollectorMXBeans());
}

/**
* Creates a new set of gauges for the given collection of garbage collectors.
*
* @param garbageCollectors the garbage collectors
*/
public GarbageCollectorMetricSet(Collection<GarbageCollectorMXBean> garbageCollectors) {
this.garbageCollectors = new ArrayList<GarbageCollectorMXBean>(garbageCollectors);
}

@Override
public Map<MetricName, Metric> getMetrics() {
final Map<MetricName, Metric> gauges = new HashMap<MetricName, Metric>();
for (final GarbageCollectorMXBean gc : garbageCollectors) {
final String name = WHITESPACE.matcher(gc.getName()).replaceAll("-");
//收集数目
gauges.put(name(name, "count"), new Gauge<Long>() {
@Override
public Long getValue() {
return gc.getCollectionCount();
}
});
//收集时间
gauges.put(name(name, "time"), new Gauge<Long>() {
@Override
public Long getValue() {
return gc.getCollectionTime();
}
});
}
return Collections.unmodifiableMap(gauges);
}
}

MemoryUsageGaugeSet.java

通过MemoryMXBean实现JVM 内存使用,堆状态,GC特殊内存池
包括:堆内存,非堆内存,内存池
获取监测的数据:总初始化数目,总被使用数目,最大数目,堆初始化,堆使用,堆最大值,非堆内存-初始化,非堆内存-使用,内存池数据的获取,使用比率,最大使用,初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package io.dropwizard.metrics.jvm;

import static io.dropwizard.metrics.MetricRegistry.name;

import io.dropwizard.metrics.Gauge;
import io.dropwizard.metrics.Metric;
import io.dropwizard.metrics.MetricName;
import io.dropwizard.metrics.MetricSet;
import io.dropwizard.metrics.RatioGauge;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

/**
* A set of gauges for JVM memory usage, including stats on heap vs. non-heap memory, plus
* GC-specific memory pools.
*
* 通过MemoryMXBean实现JVM 内存使用,堆状态,GC特殊内存池
*
* 包括:堆内存,非堆内存,内存池
*
* 获取监测的数据:总初始化数目,总被使用数目,最大数目,堆初始化,堆使用,堆最大值,非堆内存-初始化,非堆内存-使用
* 内存池数据的获取,使用比率,最大使用,初始化
*
*/
public class MemoryUsageGaugeSet implements MetricSet {
private static final Pattern WHITESPACE = Pattern.compile("[\\s]+");

/**
* 内存MXbean
*/
private final MemoryMXBean mxBean;
/**
* 内存池MXBean对象
*/
private final List<MemoryPoolMXBean> memoryPools;

public MemoryUsageGaugeSet() {
this(ManagementFactory.getMemoryMXBean(),
ManagementFactory.getMemoryPoolMXBeans());
}

public MemoryUsageGaugeSet(MemoryMXBean mxBean,
Collection<MemoryPoolMXBean> memoryPools) {
this.mxBean = mxBean;
this.memoryPools = new ArrayList<MemoryPoolMXBean>(memoryPools);
}

@Override
public Map<MetricName, Metric> getMetrics() {
final Map<MetricName, Metric> gauges = new HashMap<MetricName, Metric>();

//总初始化数目:
gauges.put(MetricName.build("total.init"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getInit() +
mxBean.getNonHeapMemoryUsage().getInit();
}
});

//总被使用数目:
gauges.put(MetricName.build("total.used"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getUsed() +
mxBean.getNonHeapMemoryUsage().getUsed();
}
});
//最大数目
gauges.put(MetricName.build("total.max"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getMax() +
mxBean.getNonHeapMemoryUsage().getMax();
}
});

gauges.put(MetricName.build("total.committed"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getCommitted() +
mxBean.getNonHeapMemoryUsage().getCommitted();
}
});

//堆初始化
gauges.put(MetricName.build("heap.init"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getInit();
}
});

//堆使用
gauges.put(MetricName.build("heap.used"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getUsed();
}
});

//最大值
gauges.put(MetricName.build("heap.max"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getMax();
}
});

gauges.put(MetricName.build("heap.committed"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getHeapMemoryUsage().getCommitted();
}
});

//堆使用
gauges.put(MetricName.build("heap.usage"), new RatioGauge() {
@Override
protected Ratio getRatio() {
final MemoryUsage usage = mxBean.getHeapMemoryUsage();
return Ratio.of(usage.getUsed(), usage.getMax());
}
});

//非堆内存-初始化
gauges.put(MetricName.build("non-heap.init"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getInit();
}
});

//非堆内存-使用
gauges.put(MetricName.build("non-heap.used"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getUsed();
}
});

//
gauges.put(MetricName.build("non-heap.max"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getMax();
}
});

gauges.put(MetricName.build("non-heap.committed"), new Gauge<Long>() {
@Override
public Long getValue() {
return mxBean.getNonHeapMemoryUsage().getCommitted();
}
});

gauges.put(MetricName.build("non-heap.usage"), new RatioGauge() {
@Override
protected Ratio getRatio() {
final MemoryUsage usage = mxBean.getNonHeapMemoryUsage();
return Ratio.of(usage.getUsed(), usage.getMax());
}
});

//内存池数据的获取,使用比率,最大使用,初始化
for (final MemoryPoolMXBean pool : memoryPools) {
final MetricName poolName = name("pools", WHITESPACE.matcher(pool.getName()).replaceAll("-"));
//使用比率
gauges.put(poolName.resolve("usage"),
new RatioGauge() {
@Override
protected Ratio getRatio() {
MemoryUsage usage = pool.getUsage();
return Ratio.of(usage.getUsed(),
usage.getMax() == -1 ? usage.getCommitted() : usage.getMax());
}
});
//内存池最大使用
gauges.put(poolName.resolve("max"),new Gauge<Long>() {
@Override
public Long getValue() {
return pool.getUsage().getMax();
}
});

//使用
gauges.put(poolName.resolve("used"),new Gauge<Long>() {
@Override
public Long getValue() {
return pool.getUsage().getUsed();
}
});

//提交
gauges.put(poolName.resolve("committed"),new Gauge<Long>() {
@Override
public Long getValue() {
return pool.getUsage().getCommitted();
}
});

// Only register GC usage metrics if the memory pool supports usage statistics.
if (pool.getCollectionUsage() != null) {
gauges.put(poolName.resolve("used-after-gc"),new Gauge<Long>() {
@Override
public Long getValue() {
return pool.getCollectionUsage().getUsed();
}
});
}

//初始化
gauges.put(poolName.resolve("init"),new Gauge<Long>() {
@Override
public Long getValue() {
return pool.getUsage().getInit();
}
});
}

return Collections.unmodifiableMap(gauges);
}
}

ThreadDeadlockDetector.java

通过ThreadMXBean,检测线程死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package io.dropwizard.metrics.jvm;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* A utility class for detecting deadlocked threads.
*
* 通过ThreadMXBean,检测线程死锁
*/
public class ThreadDeadlockDetector {
private static final int MAX_STACK_TRACE_DEPTH = 100;

private final ThreadMXBean threads;

/**
* Creates a new detector.
*/
public ThreadDeadlockDetector() {
this(ManagementFactory.getThreadMXBean());
}

/**
* Creates a new detector using the given {@link ThreadMXBean}.
*
* @param threads a {@link ThreadMXBean}
*/
public ThreadDeadlockDetector(ThreadMXBean threads) {
this.threads = threads;
}

/**
* Returns a set of diagnostic stack traces for any deadlocked threads. If no threads are
* deadlocked, returns an empty set.
*
* 返回线程死锁信息
*
* @return stack traces for deadlocked threads or an empty set
*/
public Set<String> getDeadlockedThreads() {
//死锁线程id
final long[] ids = threads.findDeadlockedThreads();
if (ids != null) {
final Set<String> deadlocks = new HashSet<String>();
//死锁线程信息
for (ThreadInfo info : threads.getThreadInfo(ids, MAX_STACK_TRACE_DEPTH)) {
final StringBuilder stackTrace = new StringBuilder();
//线程堆栈信息
for (StackTraceElement element : info.getStackTrace()) {
stackTrace.append("\t at ")
.append(element.toString())
.append(String.format("%n"));
}

deadlocks.add(
String.format("%s locked on %s (owned by %s):%n%s",
info.getThreadName(),
info.getLockName(),
info.getLockOwnerName(),
stackTrace.toString()
)
);
}
return Collections.unmodifiableSet(deadlocks);
}
return Collections.emptySet();
}
}

ThreadDump.java

通过ThreadMXBean,获取线程快照

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package io.dropwizard.metrics.jvm;

import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.lang.management.LockInfo;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.nio.charset.Charset;

/**
* A convenience class for getting a thread dump.
*
* 通过ThreadMXBean,获取线程快照
*
*/
public class ThreadDump {
private static final Charset UTF_8 = Charset.forName("UTF-8");

private final ThreadMXBean threadMXBean;

public ThreadDump(ThreadMXBean threadMXBean) {
this.threadMXBean = threadMXBean;
}

/**
* Dumps all of the threads' current information to an output stream.
*
* @param out an output stream
*/
public void dump(OutputStream out) {
//Returns the thread info for all live threads with stack trace
final ThreadInfo[] threads = this.threadMXBean.dumpAllThreads(true, true);
final PrintWriter writer = new PrintWriter(new OutputStreamWriter(out, UTF_8));
//线程信息
for (int ti = threads.length - 1; ti >= 0; ti--) {
final ThreadInfo t = threads[ti];
writer.printf("\"%s\" id=%d state=%s",
t.getThreadName(),
t.getThreadId(),
t.getThreadState());
//锁信息
//锁状态:sync,block,wait
final LockInfo lock = t.getLockInfo();
//非锁
if (lock != null && t.getThreadState() != Thread.State.BLOCKED) {
writer.printf("%n - waiting on <0x%08x> (a %s)",
lock.getIdentityHashCode(),
lock.getClassName());
writer.printf("%n - locked <0x%08x> (a %s)",
lock.getIdentityHashCode(),
lock.getClassName());
} else if (lock != null && t.getThreadState() == Thread.State.BLOCKED) {//锁
writer.printf("%n - waiting to lock <0x%08x> (a %s)",
lock.getIdentityHashCode(),
lock.getClassName());
}

if (t.isSuspended()) {
writer.print(" (suspended)");
}

if (t.isInNative()) {
writer.print(" (running in native)");
}

writer.println();
if (t.getLockOwnerName() != null) {
writer.printf(" owned by %s id=%d%n", t.getLockOwnerName(), t.getLockOwnerId());
}

//线程堆栈信息
final StackTraceElement[] elements = t.getStackTrace();
//锁监控信息
final MonitorInfo[] monitors = t.getLockedMonitors();

for (int i = 0; i < elements.length; i++) {
final StackTraceElement element = elements[i];
writer.printf(" at %s%n", element);
//监控
for (int j = 1; j < monitors.length; j++) {
final MonitorInfo monitor = monitors[j];
if (monitor.getLockedStackDepth() == i) {
writer.printf(" - locked %s%n", monitor);
}
}
}
writer.println();

//sync的lock
final LockInfo[] locks = t.getLockedSynchronizers();
if (locks.length > 0) {
writer.printf(" Locked synchronizers: count = %d%n", locks.length);
for (LockInfo l : locks) {
writer.printf(" - %s%n", l);
}
writer.println();
}
}

writer.println();
writer.flush();
}
}

ThreadStatesGaugeSet.java

通过ThreadMXBean,ThreadDeadlockDetector检测状态、死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package io.dropwizard.metrics.jvm;

import static io.dropwizard.metrics.MetricRegistry.name;

import io.dropwizard.metrics.Gauge;
import io.dropwizard.metrics.Metric;
import io.dropwizard.metrics.MetricName;
import io.dropwizard.metrics.MetricSet;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* A set of gauges for the number of threads in their various states and deadlock detection.
*
* 通过ThreadMXBean,ThreadDeadlockDetector检测状态、死锁
*
*
*/
public class ThreadStatesGaugeSet implements MetricSet {

// do not compute stack traces.
private final static int STACK_TRACE_DEPTH = 0;

private final ThreadMXBean threads;
private final ThreadDeadlockDetector deadlockDetector;

/**
* Creates a new set of gauges using the default MXBeans.
*/
public ThreadStatesGaugeSet() {
this(ManagementFactory.getThreadMXBean(), new ThreadDeadlockDetector());
}

/**
* Creates a new set of gauges using the given MXBean and detector.
*
* @param threads a thread MXBean
* @param deadlockDetector a deadlock detector
*/
public ThreadStatesGaugeSet(ThreadMXBean threads,
ThreadDeadlockDetector deadlockDetector) {
this.threads = threads;
this.deadlockDetector = deadlockDetector;
}

/**
* 获取监控线程的数据:线程的数目,守护线程的数目,死锁线程的数目,死锁线程
*/
@Override
public Map<MetricName, Metric> getMetrics() {
final Map<MetricName, Metric> gauges = new HashMap<MetricName, Metric>();
//线程状态的数目
for (final Thread.State state : Thread.State.values()) {
gauges.put(name(state.toString().toLowerCase(), "count"),
new Gauge<Integer>() {
@Override
public Integer getValue() {
return getThreadCount(state);
}
});
}

//所有线程的数目
gauges.put(MetricName.build("count"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return threads.getThreadCount();
}
});

//守护线程的数目
gauges.put(MetricName.build("daemon.count"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return threads.getDaemonThreadCount();
}
});

//死锁线程的数目
gauges.put(MetricName.build("deadlock.count"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return deadlockDetector.getDeadlockedThreads().size();
}
});

//死锁线程
gauges.put(MetricName.build("deadlocks"), new Gauge<Set<String>>() {
@Override
public Set<String> getValue() {
return deadlockDetector.getDeadlockedThreads();
}
});

return Collections.unmodifiableMap(gauges);
}

/**
* 线程数目
* @param state
* @return
*/
private int getThreadCount(Thread.State state) {
//所有线程
final ThreadInfo[] allThreads = getThreadInfo();
int count = 0;
for (ThreadInfo info : allThreads) {
if (info != null && info.getThreadState() == state) {
count++;
}
}
return count;
}

/**
* 获取所有线程信息对象
* @return
*/
ThreadInfo[] getThreadInfo() {
return threads.getThreadInfo(threads.getAllThreadIds(), STACK_TRACE_DEPTH);
}

}

metrics-servlet

AbstractInstrumentedFilter.java

实现Filter覆写:init,destroy,doFilter,嵌入监控对象。实现AsyncListener,覆写方法,嵌入监控对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package io.dropwizard.metrics.servlet;

import static io.dropwizard.metrics.MetricRegistry.name;

import io.dropwizard.metrics.Counter;
import io.dropwizard.metrics.Meter;
import io.dropwizard.metrics.MetricRegistry;
import io.dropwizard.metrics.Timer;

import javax.servlet.*;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpServletResponseWrapper;

import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* {@link Filter} implementation which captures request information and a breakdown of the response
* codes being returned.
* <p>
* 拦截请求,返回计数信息
*
* 实现Filter覆写:init,destroy,doFilter,嵌入监控对象
* 实现AsyncListener,覆写方法,嵌入监控对象
*
*/
public abstract class AbstractInstrumentedFilter implements Filter {
static final String METRIC_PREFIX = "name-prefix";

private final String otherMetricName;
//状态码
private final Map<Integer, String> meterNamesByStatusCode;
//注册属性
private final String registryAttribute;

// initialized after call of init method
//状态码的meter
private ConcurrentMap<Integer, Meter> metersByStatusCode;
private Meter otherMeter;
//超时
private Meter timeoutsMeter;
//错误
private Meter errorsMeter;
//有效请求
private Counter activeRequests;
//
private Timer requestTimer;


/**
* Creates a new instance of the filter.
*
* @param registryAttribute the attribute used to look up the metrics registry in the
* servlet context
* @param meterNamesByStatusCode A map, keyed by status code, of meter names that we are
* interested in.
* @param otherMetricName The name used for the catch-all meter. 其他metric名字
*/
protected AbstractInstrumentedFilter(String registryAttribute,
Map<Integer, String> meterNamesByStatusCode,
String otherMetricName) {
this.registryAttribute = registryAttribute;
this.meterNamesByStatusCode = meterNamesByStatusCode;
this.otherMetricName = otherMetricName;
}

/**
* 初始化
*
* @param filterConfig
* @throws ServletException
*/
@Override
public void init(FilterConfig filterConfig) throws ServletException {
//
final MetricRegistry metricsRegistry = getMetricsFactory(filterConfig);

String metricName = filterConfig.getInitParameter(METRIC_PREFIX);
//重新赋值metricName
if (metricName == null || metricName.isEmpty()) {
metricName = getClass().getName();
}

//状态码
this.metersByStatusCode = new ConcurrentHashMap<Integer, Meter>(meterNamesByStatusCode
.size());
//初始化状态的meter
for (Entry<Integer, String> entry : meterNamesByStatusCode.entrySet()) {
metersByStatusCode.put(entry.getKey(),
metricsRegistry.meter(name(metricName, entry.getValue())));
}
//-----初始化meter
this.otherMeter = metricsRegistry.meter(name(metricName,
otherMetricName));
this.timeoutsMeter = metricsRegistry.meter(name(metricName,
"timeouts"));
this.errorsMeter = metricsRegistry.meter(name(metricName,
"errors"));
this.activeRequests = metricsRegistry.counter(name(metricName,
"activeRequests"));
this.requestTimer = metricsRegistry.timer(name(metricName,
"requests"));

}

/**
* 获取MetricRegistry对象
*
* @param filterConfig
* @return
*/
private MetricRegistry getMetricsFactory(FilterConfig filterConfig) {
final MetricRegistry metricsRegistry;

final Object o = filterConfig.getServletContext().getAttribute(this.registryAttribute);
if (o instanceof MetricRegistry) {
metricsRegistry = (MetricRegistry) o;
} else {
metricsRegistry = new MetricRegistry();
}
return metricsRegistry;
}

/**
* 销毁
*/
@Override
public void destroy() {

}

@Override
public void doFilter(ServletRequest request,
ServletResponse response,
FilterChain chain) throws IOException, ServletException {
final StatusExposingServletResponse wrappedResponse = new StatusExposingServletResponse((HttpServletResponse) response);
//+1
activeRequests.inc();
//计时
final Timer.Context context = requestTimer.time();
//错误:true,正确:false
boolean error = false;
try {
chain.doFilter(request, wrappedResponse);
} catch (IOException e) {
error = true;
throw e;
} catch (ServletException e) {
error = true;
throw e;
} catch (RuntimeException e) {
error = true;
throw e;
} finally {
//非错误,请求是sync
if (!error && request.isAsyncStarted()) {
//执行sync listener
request.getAsyncContext().addListener(new AsyncResultListener(context));
} else {//非,sync
//时间停止
context.stop();
activeRequests.dec();
//错误
if (error) {
//Meters用来度量某个时间段的平均处理次数
errorsMeter.mark();
} else {
markMeterForStatusCode(wrappedResponse.getStatus());
}
}
}
}

/**
* 根据status code 获取对应的meter
* @param status
*/
private void markMeterForStatusCode(int status) {
final Meter metric = metersByStatusCode.get(status);
if (metric != null) {
metric.mark();
} else {
otherMeter.mark();
}
}

//status wrapp response
private static class StatusExposingServletResponse extends HttpServletResponseWrapper {
// The Servlet spec says: calling setStatus is optional, if no status is set, the default is 200.
private int httpStatus = 200;

public StatusExposingServletResponse(HttpServletResponse response) {
super(response);
}

@Override
public void sendError(int sc) throws IOException {
httpStatus = sc;
super.sendError(sc);
}

@Override
public void sendError(int sc, String msg) throws IOException {
httpStatus = sc;
super.sendError(sc, msg);
}

@Override
public void setStatus(int sc) {
httpStatus = sc;
super.setStatus(sc);
}

@Override
public void setStatus(int sc, String sm) {
httpStatus = sc;
super.setStatus(sc, sm);
}

public int getStatus() {
return httpStatus;
}
}

/**
*
* Servlet 3.0 为异步处理提供了一个监听器,使用 AsyncListener 接口表示。它可以监控如下四种事件:

异步线程开始时,调用 AsyncListener 的 onStartAsync(AsyncEvent event) 方法;
异步线程出错时,调用 AsyncListener 的 onError(AsyncEvent event) 方法;
异步线程执行超时,则调用 AsyncListener 的 onTimeout(AsyncEvent event) 方法;
异步执行完毕时,调用 AsyncListener 的 onComplete(AsyncEvent event) 方法;

要注册一个 AsyncListener,只需将准备好的 AsyncListener 对象传递给 AsyncContext 对象的 addListener() 方法即可

*
*
*/
private class AsyncResultListener implements AsyncListener {

private Timer.Context context;
//非异常:false。异常:true
private boolean done = false;

public AsyncResultListener(Timer.Context context) {
this.context = context;
}

@Override
public void onComplete(AsyncEvent event) throws IOException {
//正常流程
if (!done) {
HttpServletResponse suppliedResponse = (HttpServletResponse) event.getSuppliedResponse();
//时间停止
context.stop();
//请求-1
activeRequests.dec();
markMeterForStatusCode(suppliedResponse.getStatus());
}
}

@Override
public void onTimeout(AsyncEvent event) throws IOException {
context.stop();
activeRequests.dec();
//timeout meter
timeoutsMeter.mark();
done = true;
}

@Override
public void onError(AsyncEvent event) throws IOException {
context.stop();
activeRequests.dec();
//errors
errorsMeter.mark();
done = true;
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {

}
}
}

InstrumentedFilter.java

继承AbstractInstrumentedFilter,状态码前缀

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package io.dropwizard.metrics.servlet;

import java.util.HashMap;
import java.util.Map;

/**
* Implementation of the {@link AbstractInstrumentedFilter} which provides a default set of response codes
* to capture information about. <p>Use it in your servlet.xml like this:</p>
* <pre>{@code
* <filter>
* <filter-name>instrumentedFilter</filter-name>
* <filter-class>io.dropwizard.metrics.servlet.InstrumentedFilter</filter-class>
* </filter>
* <filter-mapping>
* <filter-name>instrumentedFilter</filter-name>
* <url-pattern>/*</url-pattern>
* </filter-mapping>
* }</pre>
*/
public class InstrumentedFilter extends AbstractInstrumentedFilter {
//注册属性 = InstrumentedFilter.registry
public static final String REGISTRY_ATTRIBUTE = InstrumentedFilter.class.getName() + ".registry";
//状态码-前缀
private static final String NAME_PREFIX = "responseCodes.";
private static final int OK = 200;
private static final int CREATED = 201;
private static final int NO_CONTENT = 204;
private static final int BAD_REQUEST = 400;
private static final int NOT_FOUND = 404;
private static final int SERVER_ERROR = 500;

/**
* Creates a new instance of the filter.
*/
public InstrumentedFilter() {
super(REGISTRY_ATTRIBUTE, createMeterNamesByStatusCode(), NAME_PREFIX + "other");
}

//状态码 - 名字
private static Map<Integer, String> createMeterNamesByStatusCode() {
final Map<Integer, String> meterNamesByStatusCode = new HashMap<Integer, String>(6);
meterNamesByStatusCode.put(OK, NAME_PREFIX + "ok");
meterNamesByStatusCode.put(CREATED, NAME_PREFIX + "created");
meterNamesByStatusCode.put(NO_CONTENT, NAME_PREFIX + "noContent");
meterNamesByStatusCode.put(BAD_REQUEST, NAME_PREFIX + "badRequest");
meterNamesByStatusCode.put(NOT_FOUND, NAME_PREFIX + "notFound");
meterNamesByStatusCode.put(SERVER_ERROR, NAME_PREFIX + "serverError");
return meterNamesByStatusCode;
}
}

InstrumentedFilterContextListener.java

实现ServletContextListener监听器,嵌入监控对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package io.dropwizard.metrics.servlet;

import io.dropwizard.metrics.MetricRegistry;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

/**
* A listener implementation which injects a {@link MetricRegistry} instance into the servlet
* context. Implement {@link #getMetricRegistry()} to return the {@link MetricRegistry} for your
* application.
*
* 实现ServletContextListener监听器,嵌入监控对象
*
*/
public abstract class InstrumentedFilterContextListener implements ServletContextListener {

/**
* @return the {@link MetricRegistry} to inject into the servlet context.
*
* 获取注册的metric
*
*/
protected abstract MetricRegistry getMetricRegistry();

@Override
public void contextInitialized(ServletContextEvent sce) {
//设request值attribute:metric
//arrribute属性 - 值
sce.getServletContext().setAttribute(InstrumentedFilter.REGISTRY_ATTRIBUTE, getMetricRegistry());
}

@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}

metrics-servlets

AdminServlet.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package io.dropwizard.metrics.servlets;

import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.text.MessageFormat;

/**
* 不同类型的servlet监测
*/
public class AdminServlet extends HttpServlet {
public static final String DEFAULT_HEALTHCHECK_URI = "/healthcheck";
public static final String DEFAULT_METRICS_URI = "/metrics";
public static final String DEFAULT_PING_URI = "/ping";
public static final String DEFAULT_THREADS_URI = "/threads";
public static final String DEFAULT_CPU_PROFILE_URI = "/pprof";

public static final String METRICS_URI_PARAM_KEY = "metrics-uri";
public static final String PING_URI_PARAM_KEY = "ping-uri";
public static final String THREADS_URI_PARAM_KEY = "threads-uri";
public static final String HEALTHCHECK_URI_PARAM_KEY = "healthcheck-uri";
public static final String SERVICE_NAME_PARAM_KEY= "service-name";
public static final String CPU_PROFILE_URI_PARAM_KEY = "cpu-profile-uri";

private static final String TEMPLATE = String.format(
"<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\"%n" +
" \"http://www.w3.org/TR/html4/loose.dtd\">%n" +
"<html>%n" +
"<head>%n" +
" <title>Metrics{10}</title>%n" +
"</head>%n" +
"<body>%n" +
" <h1>Operational Menu{10}</h1>%n" +
" <ul>%n" +
" <li><a href=\"{0}{1}?pretty=true\">Metrics</a></li>%n" +
" <li><a href=\"{2}{3}\">Ping</a></li>%n" +
" <li><a href=\"{4}{5}\">Threads</a></li>%n" +
" <li><a href=\"{6}{7}?pretty=true\">Healthcheck</a></li>%n" +
" <li><a href=\"{8}{9}\">CPU Profile</a></li>%n" +
" <li><a href=\"{8}{9}?state=blocked\">CPU Contention</a></li>%n" +
" </ul>%n" +
"</body>%n" +
"</html>"
);
private static final String CONTENT_TYPE = "text/html";
private static final long serialVersionUID = -2850794040708785318L;

private transient HealthCheckServlet healthCheckServlet;
private transient MetricsServlet metricsServlet;
private transient PingServlet pingServlet;
private transient ThreadDumpServlet threadDumpServlet;
private transient CpuProfileServlet cpuProfileServlet;

private transient String metricsUri;
private transient String pingUri;
private transient String threadsUri;
private transient String healthcheckUri;
private transient String cpuprofileUri;
private transient String serviceName;

/**
* 初始化不同的servlet数据
* @param config
* @throws ServletException
*/
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);

this.healthCheckServlet = new HealthCheckServlet();
healthCheckServlet.init(config);

this.metricsServlet = new MetricsServlet();
metricsServlet.init(config);

this.pingServlet = new PingServlet();
pingServlet.init(config);

this.threadDumpServlet = new ThreadDumpServlet();
threadDumpServlet.init(config);

this.cpuProfileServlet = new CpuProfileServlet();
cpuProfileServlet.init(config);

this.metricsUri = getParam(config.getInitParameter(METRICS_URI_PARAM_KEY), DEFAULT_METRICS_URI);
this.pingUri = getParam(config.getInitParameter(PING_URI_PARAM_KEY), DEFAULT_PING_URI);
this.threadsUri = getParam(config.getInitParameter(THREADS_URI_PARAM_KEY), DEFAULT_THREADS_URI);
this.healthcheckUri = getParam(config.getInitParameter(HEALTHCHECK_URI_PARAM_KEY), DEFAULT_HEALTHCHECK_URI);
this.cpuprofileUri = getParam(config.getInitParameter(CPU_PROFILE_URI_PARAM_KEY), DEFAULT_CPU_PROFILE_URI);
this.serviceName = getParam(config.getInitParameter(SERVICE_NAME_PARAM_KEY), null);
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
final String path = req.getContextPath() + req.getServletPath();

resp.setStatus(HttpServletResponse.SC_OK);
resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
resp.setContentType(CONTENT_TYPE);
final PrintWriter writer = resp.getWriter();
try {
writer.println(MessageFormat.format(TEMPLATE, path, metricsUri, path, pingUri, path,
threadsUri, path, healthcheckUri, path, cpuprofileUri,
serviceName == null ? "" : " (" + serviceName + ")"));
} finally {
writer.close();
}
}

/**
*
* 执行不同metric的servlet的service
*
*/
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
final String uri = req.getPathInfo();
if (uri == null || uri.equals("/")) {
super.service(req, resp);
} else if (uri.equals(healthcheckUri)) {
healthCheckServlet.service(req, resp);
} else if (uri.startsWith(metricsUri)) {
metricsServlet.service(req, resp);
} else if (uri.equals(pingUri)) {
pingServlet.service(req, resp);
} else if (uri.equals(threadsUri)) {
threadDumpServlet.service(req, resp);
} else if (uri.equals(cpuprofileUri)) {
cpuProfileServlet.service(req, resp);
} else {
resp.sendError(HttpServletResponse.SC_NOT_FOUND);
}
}

private static String getParam(String initParam, String defaultValue) {
return initParam == null ? defaultValue : initParam;
}
}

CpuProfileServlet.java

cpu概况的servlet,快照时候需要加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package io.dropwizard.metrics.servlets;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.joda.time.Duration;
import com.papertrail.profiler.CpuProfile;

/**
* An HTTP servlets which outputs a <a href="https://github.com/gperftools/gperftools">pprof</a> parseable response.
*
* cpu 概况
*/
public class CpuProfileServlet extends HttpServlet {
private static final long serialVersionUID = -668666696530287501L;
private static final String CONTENT_TYPE = "pprof/raw";
private static final String CACHE_CONTROL = "Cache-Control";
private static final String NO_CACHE = "must-revalidate,no-cache,no-store";
/**
* 处理线程安全
*/
private final Lock lock = new ReentrantLock();

@Override
protected void doGet(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
//持续
int duration = 10;
if (req.getParameter("duration") != null) {
try {
duration = Integer.parseInt(req.getParameter("duration"));
} catch (NumberFormatException e) {
duration = 10;
}
}
//频率
int frequency = 100;
if (req.getParameter("frequency") != null) {
try {
frequency = Integer.parseInt(req.getParameter("frequency"));
} catch (NumberFormatException e) {
frequency = 100;
}
}

//线程状态
final Thread.State state;
if ("blocked".equalsIgnoreCase(req.getParameter("state"))) {
state = Thread.State.BLOCKED;
}
else {
state = Thread.State.RUNNABLE;
}

resp.setStatus(HttpServletResponse.SC_OK);
resp.setHeader(CACHE_CONTROL, NO_CACHE);
resp.setContentType(CONTENT_TYPE);
final OutputStream output = resp.getOutputStream();
try {
doProfile(output, duration, frequency, state);
} finally {
output.close();
}
}

//获取cpu快照
protected void doProfile(OutputStream out, int duration, int frequency, Thread.State state) throws IOException {
if (lock.tryLock()) {
try {
//单进程的获取cpu信息
CpuProfile profile = CpuProfile.record(Duration.standardSeconds(duration), frequency, state);
if (profile == null) {
throw new RuntimeException("could not create CpuProfile");
}
profile.writeGoogleProfile(out);
return;
} finally {
lock.unlock();
}
}
throw new RuntimeException("Only one profile request may be active at a time");
}
}

HealthCheckServlet.java

健康监测,通过对象或者线程服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package io.dropwizard.metrics.servlets;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

import io.dropwizard.metrics.json.HealthCheckModule;

import io.dropwizard.metrics.health.HealthCheck;
import io.dropwizard.metrics.health.HealthCheckRegistry;

import javax.servlet.*;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ExecutorService;


/**
* 健康监测,通过对象或者线程服务
*/
public class HealthCheckServlet extends HttpServlet {

/**
* 自定义此servlet监听器
*/
public static abstract class ContextListener implements ServletContextListener {
/**
* @return the {@link HealthCheckRegistry} to inject into the servlet context.
*
* 获取对象
*
*/
protected abstract HealthCheckRegistry getHealthCheckRegistry();

/**
* @return the {@link ExecutorService} to inject into the servlet context, or {@code null}
* if the health checks should be run in the servlet worker thread.
*
*
*/
protected ExecutorService getExecutorService() {
// don't use a thread pool by default
return null;
}

@Override
public void contextInitialized(ServletContextEvent event) {
final ServletContext context = event.getServletContext();
context.setAttribute(HEALTH_CHECK_REGISTRY, getHealthCheckRegistry());
context.setAttribute(HEALTH_CHECK_EXECUTOR, getExecutorService());
}

@Override
public void contextDestroyed(ServletContextEvent event) {
// no-op
}
}
////////////////////////////////////////////////////////////////////////////////

public static final String HEALTH_CHECK_REGISTRY = HealthCheckServlet.class.getCanonicalName() + ".registry";
public static final String HEALTH_CHECK_EXECUTOR = HealthCheckServlet.class.getCanonicalName() + ".executor";

private static final long serialVersionUID = -8432996484889177321L;
private static final String CONTENT_TYPE = "application/json";

/**
* 健康监测注册对象
*/
private transient HealthCheckRegistry registry;
/**
* 线程执行服务
*/
private transient ExecutorService executorService;
private transient ObjectMapper mapper;

public HealthCheckServlet() {
}

public HealthCheckServlet(HealthCheckRegistry registry) {
this.registry = registry;
}


@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
//初始化 HealthCheckRegistry 对象
if (null == registry) {
final Object registryAttr = config.getServletContext().getAttribute(HEALTH_CHECK_REGISTRY);
//初始化注册对象
if (registryAttr instanceof HealthCheckRegistry) {
this.registry = (HealthCheckRegistry) registryAttr;
} else {
throw new ServletException("Couldn't find a HealthCheckRegistry instance.");
}
}

//初始化线程池
final Object executorAttr = config.getServletContext().getAttribute(HEALTH_CHECK_EXECUTOR);
if (executorAttr instanceof ExecutorService) {
this.executorService = (ExecutorService) executorAttr;
}
//json对象
this.mapper = new ObjectMapper().registerModule(new HealthCheckModule());
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
//检测 health 返回结果
final SortedMap<String, HealthCheck.Result> results = runHealthChecks();
resp.setContentType(CONTENT_TYPE);
resp.setHeader("Cache-Control", "must-revalidate,no-cache,no-store");
//检测结果
if (results.isEmpty()) {
resp.setStatus(HttpServletResponse.SC_NOT_IMPLEMENTED);
} else {
//是否健康
if (isAllHealthy(results)) {
resp.setStatus(HttpServletResponse.SC_OK);
} else {//错误
resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
}

final OutputStream output = resp.getOutputStream();
try {
getWriter(req).writeValue(output, results);
} finally {
output.close();
}
}

private ObjectWriter getWriter(HttpServletRequest request) {
final boolean prettyPrint = Boolean.parseBoolean(request.getParameter("pretty"));
if (prettyPrint) {
return mapper.writerWithDefaultPrettyPrinter();
}
return mapper.writer();
}

/**
* 运行健康监测
* @return
*/
private SortedMap<String, HealthCheck.Result> runHealthChecks() {
if (executorService == null) {
return registry.runHealthChecks();
}
return registry.runHealthChecks(executorService);
}

/**
* 检测当前结果是否健康
* @param results
* @return
*/
private static boolean isAllHealthy(Map<String, HealthCheck.Result> results) {
for (HealthCheck.Result result : results.values()) {
if (!result.isHealthy()) {
return false;
}
}
return true;
}
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×