-
Notifications
You must be signed in to change notification settings - Fork 0
/
op_ExecuteStatement.go
146 lines (114 loc) · 4.73 KB
/
op_ExecuteStatement.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package dyno
import (
"context"
ddb "github.com/aws/aws-sdk-go-v2/service/dynamodb"
"sync"
)
// ExecuteStatement executes ExecuteStatement operation and returns a ExecuteStatement operation
func (s *Session) ExecuteStatement(input *ddb.ExecuteStatementInput, mw ...ExecuteStatementMiddleWare) *ExecuteStatement {
return NewExecuteStatement(input, mw...).Invoke(s.ctx, s.ddb)
}
// ExecuteStatement executes a ExecuteStatement operation with a ExecuteStatementInput in this pool and returns the ExecuteStatement operation
func (p *Pool) ExecuteStatement(input *ddb.ExecuteStatementInput, mw ...ExecuteStatementMiddleWare) *ExecuteStatement {
op := NewExecuteStatement(input, mw...)
p.Do(op) // run the operation in the pool
return op
}
// ExecuteStatementContext represents an exhaustive ExecuteStatement operation request context
type ExecuteStatementContext struct {
context.Context
Input *ddb.ExecuteStatementInput
Client *ddb.Client
}
// ExecuteStatementOutput represents the output for the ExecuteStatement operation
type ExecuteStatementOutput struct {
out *ddb.ExecuteStatementOutput
err error
mu sync.RWMutex
}
// Set sets the output
func (o *ExecuteStatementOutput) Set(out *ddb.ExecuteStatementOutput, err error) {
o.mu.Lock()
o.out = out
o.err = err
o.mu.Unlock()
}
// Get gets the output
func (o *ExecuteStatementOutput) Get() (out *ddb.ExecuteStatementOutput, err error) {
o.mu.Lock()
out = o.out
err = o.err
o.mu.Unlock()
return
}
// ExecuteStatementHandler represents a handler for ExecuteStatement requests
type ExecuteStatementHandler interface {
HandleExecuteStatement(ctx *ExecuteStatementContext, output *ExecuteStatementOutput)
}
// ExecuteStatementHandlerFunc is a ExecuteStatementHandler function
type ExecuteStatementHandlerFunc func(ctx *ExecuteStatementContext, output *ExecuteStatementOutput)
// HandleExecuteStatement implements ExecuteStatementHandler
func (h ExecuteStatementHandlerFunc) HandleExecuteStatement(ctx *ExecuteStatementContext, output *ExecuteStatementOutput) {
h(ctx, output)
}
// ExecuteStatementFinalHandler is the final ExecuteStatementHandler that executes a dynamodb ExecuteStatement operation
type ExecuteStatementFinalHandler struct{}
// HandleExecuteStatement implements the ExecuteStatementHandler
func (h *ExecuteStatementFinalHandler) HandleExecuteStatement(ctx *ExecuteStatementContext, output *ExecuteStatementOutput) {
output.Set(ctx.Client.ExecuteStatement(ctx, ctx.Input))
}
// ExecuteStatementMiddleWare is a middleware function use for wrapping ExecuteStatementHandler requests
type ExecuteStatementMiddleWare interface {
ExecuteStatementMiddleWare(next ExecuteStatementHandler) ExecuteStatementHandler
}
// ExecuteStatementMiddleWareFunc is a functional ExecuteStatementMiddleWare
type ExecuteStatementMiddleWareFunc func(next ExecuteStatementHandler) ExecuteStatementHandler
// ExecuteStatementMiddleWare implements the ExecuteStatementMiddleWare interface
func (mw ExecuteStatementMiddleWareFunc) ExecuteStatementMiddleWare(next ExecuteStatementHandler) ExecuteStatementHandler {
return mw(next)
}
// ExecuteStatement represents a ExecuteStatement operation
type ExecuteStatement struct {
*BaseOperation
input *ddb.ExecuteStatementInput
middleWares []ExecuteStatementMiddleWare
}
// NewExecuteStatement creates a new ExecuteStatement operation
func NewExecuteStatement(input *ddb.ExecuteStatementInput, mws ...ExecuteStatementMiddleWare) *ExecuteStatement {
return &ExecuteStatement{
BaseOperation: NewOperation(),
input: input,
middleWares: mws,
}
}
// Invoke invokes the ExecuteStatement operation in a goroutine and returns a ExecuteStatement operation
func (op *ExecuteStatement) Invoke(ctx context.Context, client *ddb.Client) *ExecuteStatement {
op.SetRunning() // operation now waiting for a response
go op.InvokeDynoOperation(ctx, client)
return op
}
// InvokeDynoOperation invokes the ExecuteStatement operation
func (op *ExecuteStatement) InvokeDynoOperation(ctx context.Context, client *ddb.Client) {
output := new(ExecuteStatementOutput)
defer func() { op.SetResponse(output.Get()) }()
var h ExecuteStatementHandler
h = new(ExecuteStatementFinalHandler)
// loop in reverse to preserve middleware order
for i := len(op.middleWares) - 1; i >= 0; i-- {
h = op.middleWares[i].ExecuteStatementMiddleWare(h)
}
requestCtx := &ExecuteStatementContext{
Context: ctx,
Client: client,
Input: op.input,
}
h.HandleExecuteStatement(requestCtx, output)
}
// Await waits for the ExecuteStatement operation to be fulfilled and then returns a ExecuteStatementOutput and error
func (op *ExecuteStatement) Await() (*ddb.ExecuteStatementOutput, error) {
out, err := op.BaseOperation.Await()
if out == nil {
return nil, err
}
return out.(*ddb.ExecuteStatementOutput), err
}