@@ -21,19 +21,6 @@ type DefaultListener struct {
21
21
controllers []control.Func
22
22
}
23
23
24
- type combinedListener struct {
25
- net.Listener
26
- locker * FileLocker // for unix domain socket
27
- }
28
-
29
- func (cl * combinedListener ) Close () error {
30
- if cl .locker != nil {
31
- cl .locker .Release ()
32
- cl .locker = nil
33
- }
34
- return cl .Listener .Close ()
35
- }
36
-
37
24
func getControlFunc (ctx context.Context , sockopt * SocketConfig , controllers []control.Func ) func (network , address string , c syscall.RawConn ) error {
38
25
return func (network , address string , c syscall.RawConn ) error {
39
26
return c .Control (func (fd uintptr ) {
@@ -54,6 +41,40 @@ func getControlFunc(ctx context.Context, sockopt *SocketConfig, controllers []co
54
41
}
55
42
}
56
43
44
+ // For some reason, other component of ray will assume the listener is a TCP listener and have valid remote address.
45
+ // But in fact it doesn't. So we need to wrap the listener to make it return 0.0.0.0(unspecified) as remote address.
46
+ // If other issues encountered, we should able to fix it here.
47
+ type listenUDSWrapper struct {
48
+ net.Listener
49
+ locker * FileLocker
50
+ }
51
+
52
+ func (l * listenUDSWrapper ) Accept () (net.Conn , error ) {
53
+ conn , err := l .Listener .Accept ()
54
+ if err != nil {
55
+ return nil , err
56
+ }
57
+ return & listenUDSWrapperConn {Conn : conn }, nil
58
+ }
59
+
60
+ func (l * listenUDSWrapper ) Close () error {
61
+ if l .locker != nil {
62
+ l .locker .Release ()
63
+ l .locker = nil
64
+ }
65
+ return l .Listener .Close ()
66
+ }
67
+
68
+ type listenUDSWrapperConn struct {
69
+ net.Conn
70
+ }
71
+
72
+ func (conn * listenUDSWrapperConn ) RemoteAddr () net.Addr {
73
+ return & net.TCPAddr {
74
+ IP : []byte {0 , 0 , 0 , 0 },
75
+ }
76
+ }
77
+
57
78
func (dl * DefaultListener ) Listen (ctx context.Context , addr net.Addr , sockopt * SocketConfig ) (l net.Listener , err error ) {
58
79
var lc net.ListenConfig
59
80
var network , address string
@@ -113,9 +134,9 @@ func (dl *DefaultListener) Listen(ctx context.Context, addr net.Addr, sockopt *S
113
134
callback = func (l net.Listener , err error ) (net.Listener , error ) {
114
135
if err != nil {
115
136
locker .Release ()
116
- return l , err
137
+ return nil , err
117
138
}
118
- l = & combinedListener {Listener : l , locker : locker }
139
+ l = & listenUDSWrapper {Listener : l , locker : locker }
119
140
if filePerm == nil {
120
141
return l , nil
121
142
}
@@ -129,9 +150,8 @@ func (dl *DefaultListener) Listen(ctx context.Context, addr net.Addr, sockopt *S
129
150
}
130
151
}
131
152
132
- l , err = lc .Listen (ctx , network , address )
133
- l , err = callback (l , err )
134
- if sockopt != nil && sockopt .AcceptProxyProtocol {
153
+ l , err = callback (lc .Listen (ctx , network , address ))
154
+ if err == nil && sockopt != nil && sockopt .AcceptProxyProtocol {
135
155
policyFunc := func (upstream net.Addr ) (proxyproto.Policy , error ) { return proxyproto .REQUIRE , nil }
136
156
l = & proxyproto.Listener {Listener : l , Policy : policyFunc }
137
157
}
0 commit comments