-
Notifications
You must be signed in to change notification settings - Fork 7
/
rpc.go
115 lines (94 loc) · 2.5 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package progrock
import (
"context"
"errors"
"io"
"net"
"sync"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
// ServeRPC serves a ProgressService over the given listener.
func ServeRPC(l net.Listener, w Writer) (Writer, error) {
recv := NewRPCReceiver(w)
srv := grpc.NewServer()
RegisterProgressServiceServer(srv, recv)
go srv.Serve(l)
return WaitWriter{
Writer: w,
srv: srv,
}, nil
}
// DialRPC dials a ProgressService at the given target.
func DialRPC(ctx context.Context, target string) (Writer, error) {
conn, err := grpc.DialContext(ctx, target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
client := NewProgressServiceClient(conn)
updates, err := client.WriteUpdates(ctx)
if err != nil {
return nil, err
}
return NewRPCWriter(conn, updates), nil
}
// RPCWriter is a Writer that writes to a ProgressService.
type RPCWriter struct {
Conn *grpc.ClientConn
Updates ProgressService_WriteUpdatesClient
l sync.Mutex
}
// NewRPCWriter returns a new RPCWriter.
func NewRPCWriter(conn *grpc.ClientConn, updates ProgressService_WriteUpdatesClient) *RPCWriter {
return &RPCWriter{
Conn: conn,
Updates: updates,
}
}
// WriteStatus implements Writer.
func (w *RPCWriter) WriteStatus(status *StatusUpdate) error {
w.l.Lock()
defer w.l.Unlock()
return w.Updates.Send(status)
}
// Close closes the underlying RPC connection.
func (w *RPCWriter) Close() error {
_, err := w.Updates.CloseAndRecv()
return err
}
// RPCReceiver is a ProgressServiceServer that writes to a Writer.
type RPCReceiver struct {
w Writer
UnimplementedProgressServiceServer
}
// NewRPCReceiver returns a new RPCReceiver.
func NewRPCReceiver(w Writer) *RPCReceiver {
return &RPCReceiver{w: w}
}
// WriteUpdates implements ProgressServiceServer.
func (recv *RPCReceiver) WriteUpdates(srv ProgressService_WriteUpdatesServer) error {
for {
update, err := srv.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return srv.SendAndClose(&emptypb.Empty{})
}
return err
}
if err := recv.w.WriteStatus(update); err != nil {
return err
}
}
}
// WaitWriter is a Writer that waits for the RPC server to stop before closing
// the underlying Writer.
type WaitWriter struct {
Writer
srv *grpc.Server
}
// Close waits for the RPC server to stop and closes the underlying Writer.
func (ww WaitWriter) Close() error {
ww.srv.GracefulStop()
return ww.Writer.Close()
}