-
Notifications
You must be signed in to change notification settings - Fork 18
/
proxy.go
184 lines (146 loc) · 5.21 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Package proxy is plugin that proxies requests.
package proxy
import (
"context"
"errors"
"fmt"
"net"
"sync/atomic"
"time"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/metrics"
"github.com/coredns/coredns/request"
"github.com/coredns/proxy/healthcheck"
"github.com/miekg/dns"
ot "github.com/opentracing/opentracing-go"
)
var (
errUnreachable = errors.New("unreachable backend")
errInvalidProtocol = errors.New("invalid protocol")
errInvalidDomain = errors.New("invalid path for proxy")
)
// Proxy represents a plugin instance that can proxy requests to another (DNS) server.
type Proxy struct {
Next plugin.Handler
// Upstreams is a pointer to a slice, so we can update the upstream (used for Google)
// midway.
Upstreams *[]Upstream
// Trace is the Trace plugin, if it is installed
// This is used by the grpc exchanger to trace through the grpc calls
Trace plugin.Handler
}
// Upstream manages a pool of proxy upstream hosts. Select should return a
// suitable upstream host, or nil if no such hosts are available.
type Upstream interface {
// The domain name this upstream host should be routed on.
From() string
// Selects an upstream host to be routed to.
Select() *healthcheck.UpstreamHost
// Checks if subdomain is not an ignored.
IsAllowedDomain(string) bool
// Exchanger returns the exchanger to be used for this upstream.
Exchanger() Exchanger
// Stops the upstream from proxying requests to shutdown goroutines cleanly.
Stop() error
}
// tryDuration is how long to try upstream hosts; failures result in
// immediate retries until this duration ends or we get a nil host.
var tryDuration = 16 * time.Second
// ServeDNS satisfies the plugin.Handler interface.
func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
var span, child ot.Span
span = ot.SpanFromContext(ctx)
state := request.Request{W: w, Req: r}
upstream := p.match(state)
if upstream == nil {
return plugin.NextOrFailure(p.Name(), p.Next, ctx, w, r)
}
for {
start := time.Now()
var reply *dns.Msg
var backendErr error
// Since Select() should give us "up" hosts, keep retrying
// hosts until timeout (or until we get a nil host).
for time.Since(start) < tryDuration {
host := upstream.Select()
if host == nil {
return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, "no upstream host")
}
if span != nil {
child = span.Tracer().StartSpan("exchange", ot.ChildOf(span.Context()))
ctx = ot.ContextWithSpan(ctx, child)
}
atomic.AddInt64(&host.Conns, 1)
RequestCount.WithLabelValues(metrics.WithServer(ctx), state.Proto(), upstream.Exchanger().Protocol(), familyToString(state.Family()), host.Name).Add(1)
reply, backendErr = upstream.Exchanger().Exchange(ctx, host.Name, state)
atomic.AddInt64(&host.Conns, -1)
if child != nil {
child.Finish()
}
taperr := toDnstap(ctx, host.Name, upstream.Exchanger(), state, reply, start)
if backendErr == nil {
// Check if the reply is correct; if not return FormErr.
if !state.Match(reply) {
formerr := new(dns.Msg)
formerr.SetRcode(state.Req, dns.RcodeFormatError)
w.WriteMsg(formerr)
return 0, taperr
}
w.WriteMsg(reply)
RequestDuration.WithLabelValues(metrics.WithServer(ctx), state.Proto(), upstream.Exchanger().Protocol(), familyToString(state.Family()), host.Name).Observe(time.Since(start).Seconds())
return 0, taperr
}
// A "ANY isc.org" query is being dropped by ISC's nameserver, we see this as a i/o timeout, but
// would then mark our upstream is being broken. We should not do this if we consider the error temporary.
// Of course it could really be that our upstream is broken
if oe, ok := backendErr.(*net.OpError); ok {
// Note this keeps looping and trying until tryDuration is hit, at which point our client
// might be long gone...
if oe.Timeout() {
// Our upstream's upstream is probably messing up, continue with next selected
// host - which my be the *same* one as we don't set any uh.Fails.
continue
}
}
timeout := host.FailTimeout
if timeout == 0 {
timeout = defaultFailTimeout
}
atomic.AddInt32(&host.Fails, 1)
fails := atomic.LoadInt32(&host.Fails)
go func(host *healthcheck.UpstreamHost, timeout time.Duration) {
time.Sleep(timeout)
// we may go negative here, should be rectified by the HC.
atomic.AddInt32(&host.Fails, -1)
if fails%failureCheck == 0 { // Kick off healthcheck on every third failure.
host.HealthCheckURL()
}
}(host, timeout)
}
return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, backendErr)
}
}
func (p Proxy) match(state request.Request) (u Upstream) {
if p.Upstreams == nil {
return nil
}
longestMatch := 0
for _, upstream := range *p.Upstreams {
from := upstream.From()
if !plugin.Name(from).Matches(state.Name()) || !upstream.IsAllowedDomain(state.Name()) {
continue
}
if lf := len(from); lf > longestMatch {
longestMatch = lf
u = upstream
}
}
return u
}
// Name implements the Handler interface.
func (p Proxy) Name() string { return "proxy" }
const (
defaultFailTimeout = 2 * time.Second
defaultTimeout = 5 * time.Second
failureCheck = 3
)