summaryrefslogtreecommitdiff
blob: 8adf364ce164a1085683bf45681ec9c14284717d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
From 7b2292ee20c5d49053cc5262dfbc99ce121b9b74 Mon Sep 17 00:00:00 2001
From: tifayuki <tifayuki@gmail.com>
Date: Tue, 13 Feb 2018 13:30:56 -0800
Subject: [PATCH 1/4] Add notification metrics

It adds notification related prometheus metrics, including:
  - total count for events/success/failure/error
  - total count for notification per each status code
  - gauge of the pending notification queue

Signed-off-by: tifayuki <tifayuki@gmail.com>
---
 metrics/prometheus.go    |  3 +++
 notifications/metrics.go | 28 ++++++++++++++++++++++++++++
 2 files changed, 31 insertions(+)

diff --git a/metrics/prometheus.go b/metrics/prometheus.go
index b5a532144..91b32b23d 100644
--- a/metrics/prometheus.go
+++ b/metrics/prometheus.go
@@ -10,4 +10,7 @@ const (
 var (
 	// StorageNamespace is the prometheus namespace of blob/cache related operations
 	StorageNamespace = metrics.NewNamespace(NamespacePrefix, "storage", nil)
+
+	// NotificationsNamespace is the prometheus namespace of notification related metrics
+	NotificationsNamespace = metrics.NewNamespace(NamespacePrefix, "notifications", nil)
 )
diff --git a/notifications/metrics.go b/notifications/metrics.go
index a20af1687..69960e9cb 100644
--- a/notifications/metrics.go
+++ b/notifications/metrics.go
@@ -5,6 +5,18 @@ import (
 	"fmt"
 	"net/http"
 	"sync"
+
+	prometheus "github.com/docker/distribution/metrics"
+	"github.com/docker/go-metrics"
+)
+
+var (
+	// eventsCounter counts total events of incoming, success, failure, and errors
+	eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type")
+	// pendingGauge measures the pending queue size
+	pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total)
+	// statusCounter counts the total notification call per each status code
+	statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code")
 )
 
 // EndpointMetrics track various actions taken by the endpoint, typically by
@@ -61,6 +73,9 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve
 	defer emsl.safeMetrics.Unlock()
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Successes += len(events)
+
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
+	eventsCounter.WithValues("Successes").Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
@@ -68,12 +83,17 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve
 	defer emsl.safeMetrics.Unlock()
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Failures += len(events)
+
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
+	eventsCounter.WithValues("Failures").Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
 	emsl.safeMetrics.Lock()
 	defer emsl.safeMetrics.Unlock()
 	emsl.Errors += len(events)
+
+	eventsCounter.WithValues("Errors").Inc(1)
 }
 
 // endpointMetricsEventQueueListener maintains the incoming events counter and
@@ -87,12 +107,17 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
 	defer eqc.Unlock()
 	eqc.Events += len(events)
 	eqc.Pending += len(events)
+
+	eventsCounter.WithValues("Events").Inc()
+	pendingGauge.Inc(1)
 }
 
 func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
 	eqc.Lock()
 	defer eqc.Unlock()
 	eqc.Pending -= len(events)
+
+	pendingGauge.Dec(1)
 }
 
 // endpoints is global registry of endpoints used to report metrics to expvar
@@ -149,4 +174,7 @@ func init() {
 	}))
 
 	registry.(*expvar.Map).Set("notifications", &notifications)
+
+	// register prometheus metrics
+	metrics.Register(prometheus.NotificationsNamespace)
 }

From 4497e40eda1e0024f055c09ab480b7816a1147b1 Mon Sep 17 00:00:00 2001
From: Honglin Feng <tifayuki@gmail.com>
Date: Thu, 11 Oct 2018 21:39:02 +0800
Subject: [PATCH 2/4] add label to the metrics

Signed-off-by: Honglin Feng <tifayuki@gmail.com>
---
 notifications/endpoint.go   |  2 +-
 notifications/http_test.go  |  2 +-
 notifications/metrics.go    | 26 ++++++++++++++------------
 notifications/sinks_test.go |  2 +-
 4 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/notifications/endpoint.go b/notifications/endpoint.go
index a8a52d0c9..854f1dd6c 100644
--- a/notifications/endpoint.go
+++ b/notifications/endpoint.go
@@ -58,7 +58,7 @@ func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
 	endpoint.url = url
 	endpoint.EndpointConfig = config
 	endpoint.defaults()
-	endpoint.metrics = newSafeMetrics()
+	endpoint.metrics = newSafeMetrics(name)
 
 	// Configures the inmemory queue, retry, http pipeline.
 	endpoint.Sink = newHTTPSink(
diff --git a/notifications/http_test.go b/notifications/http_test.go
index de47f789e..b7845cf95 100644
--- a/notifications/http_test.go
+++ b/notifications/http_test.go
@@ -63,7 +63,7 @@ func TestHTTPSink(t *testing.T) {
 	})
 	server := httptest.NewTLSServer(serverHandler)
 
-	metrics := newSafeMetrics()
+	metrics := newSafeMetrics("")
 	sink := newHTTPSink(server.URL, 0, nil, nil,
 		&endpointMetricsHTTPStatusListener{safeMetrics: metrics})
 
diff --git a/notifications/metrics.go b/notifications/metrics.go
index 69960e9cb..4464edd8f 100644
--- a/notifications/metrics.go
+++ b/notifications/metrics.go
@@ -12,11 +12,11 @@ import (
 
 var (
 	// eventsCounter counts total events of incoming, success, failure, and errors
-	eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type")
+	eventsCounter = prometheus.NotificationsNamespace.NewLabeledCounter("events", "The number of total events", "type", "to")
 	// pendingGauge measures the pending queue size
-	pendingGauge = prometheus.NotificationsNamespace.NewGauge("pending", "The gauge of pending events in queue", metrics.Total)
+	pendingGauge = prometheus.NotificationsNamespace.NewLabeledGauge("pending", "The gauge of pending events in queue", metrics.Total, "to")
 	// statusCounter counts the total notification call per each status code
-	statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code")
+	statusCounter = prometheus.NotificationsNamespace.NewLabeledCounter("status", "The number of status code", "code", "to")
 )
 
 // EndpointMetrics track various actions taken by the endpoint, typically by
@@ -34,14 +34,16 @@ type EndpointMetrics struct {
 // safeMetrics guards the metrics implementation with a lock and provides a
 // safe update function.
 type safeMetrics struct {
+	EndpointName string
 	EndpointMetrics
 	sync.Mutex // protects statuses map
 }
 
 // newSafeMetrics returns safeMetrics with map allocated.
-func newSafeMetrics() *safeMetrics {
+func newSafeMetrics(name string) *safeMetrics {
 	var sm safeMetrics
 	sm.Statuses = make(map[string]int)
+	sm.EndpointName = name
 	return &sm
 }
 
@@ -74,8 +76,8 @@ func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Eve
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Successes += len(events)
 
-	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
-	eventsCounter.WithValues("Successes").Inc(1)
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
+	eventsCounter.WithValues("Successes", emsl.EndpointName).Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
@@ -84,8 +86,8 @@ func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Eve
 	emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
 	emsl.Failures += len(events)
 
-	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status))).Inc(1)
-	eventsCounter.WithValues("Failures").Inc(1)
+	statusCounter.WithValues(fmt.Sprintf("%d %s", status, http.StatusText(status)), emsl.EndpointName).Inc(1)
+	eventsCounter.WithValues("Failures", emsl.EndpointName).Inc(1)
 }
 
 func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
@@ -93,7 +95,7 @@ func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
 	defer emsl.safeMetrics.Unlock()
 	emsl.Errors += len(events)
 
-	eventsCounter.WithValues("Errors").Inc(1)
+	eventsCounter.WithValues("Errors", emsl.EndpointName).Inc(1)
 }
 
 // endpointMetricsEventQueueListener maintains the incoming events counter and
@@ -108,8 +110,8 @@ func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
 	eqc.Events += len(events)
 	eqc.Pending += len(events)
 
-	eventsCounter.WithValues("Events").Inc()
-	pendingGauge.Inc(1)
+	eventsCounter.WithValues("Events", eqc.EndpointName).Inc()
+	pendingGauge.WithValues(eqc.EndpointName).Inc(1)
 }
 
 func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
@@ -117,7 +119,7 @@ func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
 	defer eqc.Unlock()
 	eqc.Pending -= len(events)
 
-	pendingGauge.Dec(1)
+	pendingGauge.WithValues(eqc.EndpointName).Dec(1)
 }
 
 // endpoints is global registry of endpoints used to report metrics to expvar
diff --git a/notifications/sinks_test.go b/notifications/sinks_test.go
index 06f88b2c9..4a69486b5 100644
--- a/notifications/sinks_test.go
+++ b/notifications/sinks_test.go
@@ -66,7 +66,7 @@ func TestBroadcaster(t *testing.T) {
 func TestEventQueue(t *testing.T) {
 	const nevents = 1000
 	var ts testSink
-	metrics := newSafeMetrics()
+	metrics := newSafeMetrics("")
 	eq := newEventQueue(
 		// delayed sync simulates destination slower than channel comms
 		&delayedSink{

From 73e4232b5171c2988b0daeea517aa07386e7945d Mon Sep 17 00:00:00 2001
From: Honglin Feng <tifayuki@gmail.com>
Date: Mon, 15 Oct 2018 19:50:38 +0800
Subject: [PATCH 3/4] run go fmt

Signed-off-by: Honglin Feng <tifayuki@gmail.com>
---
 registry/storage/driver/s3-aws/s3.go     | 10 +++++-----
 registry/storage/linkedblobstore.go      | 16 ++++++++--------
 registry/storage/linkedblobstore_test.go |  4 ++--
 3 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go
index 800435d01..9cd87dbab 100644
--- a/registry/storage/driver/s3-aws/s3.go
+++ b/registry/storage/driver/s3-aws/s3.go
@@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) {
 	// }
 
 	d := &driver{
-		S3:                          s3obj,
-		Bucket:                      params.Bucket,
-		ChunkSize:                   params.ChunkSize,
-		Encrypt:                     params.Encrypt,
-		KeyID:                       params.KeyID,
+		S3:        s3obj,
+		Bucket:    params.Bucket,
+		ChunkSize: params.ChunkSize,
+		Encrypt:   params.Encrypt,
+		KeyID:     params.KeyID,
 		MultipartCopyChunkSize:      params.MultipartCopyChunkSize,
 		MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
 		MultipartCopyThresholdSize:  params.MultipartCopyThresholdSize,
diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go
index de591c8a5..3fb1da26f 100644
--- a/registry/storage/linkedblobstore.go
+++ b/registry/storage/linkedblobstore.go
@@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
 	}
 
 	bw := &blobWriter{
-		ctx:                    ctx,
-		blobStore:              lbs,
-		id:                     uuid,
-		startedAt:              startedAt,
-		digester:               digest.Canonical.Digester(),
-		fileWriter:             fw,
-		driver:                 lbs.driver,
-		path:                   path,
+		ctx:        ctx,
+		blobStore:  lbs,
+		id:         uuid,
+		startedAt:  startedAt,
+		digester:   digest.Canonical.Digester(),
+		fileWriter: fw,
+		driver:     lbs.driver,
+		path:       path,
 		resumableDigestEnabled: lbs.resumableDigestEnabled,
 	}
 
diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go
index e0ffd2796..85376f715 100644
--- a/registry/storage/linkedblobstore_test.go
+++ b/registry/storage/linkedblobstore_test.go
@@ -162,8 +162,8 @@ type mockBlobDescriptorServiceFactory struct {
 func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService {
 	return &mockBlobDescriptorService{
 		BlobDescriptorService: svc,
-		t:                     f.t,
-		stats:                 f.stats,
+		t:     f.t,
+		stats: f.stats,
 	}
 }
 

From 5c66b577b027e3b314680f245be4213a002fcee0 Mon Sep 17 00:00:00 2001
From: Honglin Feng <tifayuki@gmail.com>
Date: Mon, 15 Oct 2018 20:18:36 +0800
Subject: [PATCH 4/4] run go fmt and goimports

Signed-off-by: Honglin Feng <tifayuki@gmail.com>
---
 registry/storage/driver/s3-aws/s3.go     | 10 +++++-----
 registry/storage/linkedblobstore.go      | 16 ++++++++--------
 registry/storage/linkedblobstore_test.go |  4 ++--
 3 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/registry/storage/driver/s3-aws/s3.go b/registry/storage/driver/s3-aws/s3.go
index 9cd87dbab..800435d01 100644
--- a/registry/storage/driver/s3-aws/s3.go
+++ b/registry/storage/driver/s3-aws/s3.go
@@ -476,11 +476,11 @@ func New(params DriverParameters) (*Driver, error) {
 	// }
 
 	d := &driver{
-		S3:        s3obj,
-		Bucket:    params.Bucket,
-		ChunkSize: params.ChunkSize,
-		Encrypt:   params.Encrypt,
-		KeyID:     params.KeyID,
+		S3:                          s3obj,
+		Bucket:                      params.Bucket,
+		ChunkSize:                   params.ChunkSize,
+		Encrypt:                     params.Encrypt,
+		KeyID:                       params.KeyID,
 		MultipartCopyChunkSize:      params.MultipartCopyChunkSize,
 		MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
 		MultipartCopyThresholdSize:  params.MultipartCopyThresholdSize,
diff --git a/registry/storage/linkedblobstore.go b/registry/storage/linkedblobstore.go
index 3fb1da26f..de591c8a5 100644
--- a/registry/storage/linkedblobstore.go
+++ b/registry/storage/linkedblobstore.go
@@ -312,14 +312,14 @@ func (lbs *linkedBlobStore) newBlobUpload(ctx context.Context, uuid, path string
 	}
 
 	bw := &blobWriter{
-		ctx:        ctx,
-		blobStore:  lbs,
-		id:         uuid,
-		startedAt:  startedAt,
-		digester:   digest.Canonical.Digester(),
-		fileWriter: fw,
-		driver:     lbs.driver,
-		path:       path,
+		ctx:                    ctx,
+		blobStore:              lbs,
+		id:                     uuid,
+		startedAt:              startedAt,
+		digester:               digest.Canonical.Digester(),
+		fileWriter:             fw,
+		driver:                 lbs.driver,
+		path:                   path,
 		resumableDigestEnabled: lbs.resumableDigestEnabled,
 	}
 
diff --git a/registry/storage/linkedblobstore_test.go b/registry/storage/linkedblobstore_test.go
index 85376f715..e0ffd2796 100644
--- a/registry/storage/linkedblobstore_test.go
+++ b/registry/storage/linkedblobstore_test.go
@@ -162,8 +162,8 @@ type mockBlobDescriptorServiceFactory struct {
 func (f *mockBlobDescriptorServiceFactory) BlobAccessController(svc distribution.BlobDescriptorService) distribution.BlobDescriptorService {
 	return &mockBlobDescriptorService{
 		BlobDescriptorService: svc,
-		t:     f.t,
-		stats: f.stats,
+		t:                     f.t,
+		stats:                 f.stats,
 	}
 }