-
Notifications
You must be signed in to change notification settings - Fork 125
/
loader.go
140 lines (127 loc) · 3.72 KB
/
loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// +build ignore
package goflow
import (
"encoding/json"
"io/ioutil"
"reflect"
"strings"
)
// Internal representation of NoFlo JSON format
type graphDescription struct {
Properties struct {
Name string
}
Processes map[string]struct {
Component string
Metadata struct {
Sync bool `json:",omitempty"`
PoolSize int64 `json:",omitempty"`
} `json:",omitempty"`
}
Connections []struct {
Data interface{} `json:",omitempty"`
Src struct {
Process string
Port string
} `json:",omitempty"`
Tgt struct {
Process string
Port string
}
Metadata struct {
Buffer int `json:",omitempty"`
} `json:",omitempty"`
}
Exports []struct {
Private string
Public string
}
}
// ParseJSON converts a JSON network definition string into
// a flow.Graph object that can be run or used in other networks
func ParseJSON(js []byte) *Graph {
// Parse JSON into Go struct
var descr graphDescription
err := json.Unmarshal(js, &descr)
if err != nil {
return nil
}
// fmt.Printf("%+v\n", descr)
constructor := func() interface{} {
// Create a new Graph
net := new(Graph)
net.InitGraphState()
// Add processes to the network
for procName, procValue := range descr.Processes {
net.AddNew(procValue.Component, procName)
// Process mode detection
if procValue.Metadata.PoolSize > 0 {
proc := net.Get(procName).(*Component)
proc.Mode = ComponentModePool
proc.PoolSize = uint8(procValue.Metadata.PoolSize)
} else if procValue.Metadata.Sync {
proc := net.Get(procName).(*Component)
proc.Mode = ComponentModeSync
}
}
// Add connections
for _, conn := range descr.Connections {
// Check if it is an IIP or actual connection
if conn.Data == nil {
// Add a connection
net.ConnectBuf(conn.Src.Process, conn.Src.Port, conn.Tgt.Process, conn.Tgt.Port, conn.Metadata.Buffer)
} else {
// Add an IIP
net.AddIIP(conn.Data, conn.Tgt.Process, conn.Tgt.Port)
}
}
// Add port exports
for _, export := range descr.Exports {
// Split private into proc.port
procName := export.Private[:strings.Index(export.Private, ".")]
procPort := export.Private[strings.Index(export.Private, ".")+1:]
// Try to detect port direction using reflection
procType := reflect.TypeOf(net.Get(procName)).Elem()
field, fieldFound := procType.FieldByName(procPort)
if !fieldFound {
panic("Private port '" + export.Private + "' not found")
}
if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.RecvDir) != 0 {
// It's an inport
net.MapInPort(export.Public, procName, procPort)
} else if field.Type.Kind() == reflect.Chan && (field.Type.ChanDir()&reflect.SendDir) != 0 {
// It's an outport
net.MapOutPort(export.Public, procName, procPort)
} else {
// It's not a proper port
panic("Private port '" + export.Private + "' is not a valid channel")
}
// TODO add support for subgraphs
}
return net
}
// Register a component to be reused
if descr.Properties.Name != "" {
Register(descr.Properties.Name, constructor)
}
return constructor().(*Graph)
}
// LoadJSON loads a JSON graph definition file into
// a flow.Graph object that can be run or used in other networks
func LoadJSON(filename string) *Graph {
js, err := ioutil.ReadFile(filename)
if err != nil {
return nil
}
return ParseJSON(js)
}
// RegisterJSON registers an external JSON graph definition as a component
// that can be instantiated at run-time using component Factory.
// It returns true on success or false if component name is already taken.
func RegisterJSON(componentName, filePath string) bool {
var constructor ComponentConstructor
constructor = func() interface{} {
return LoadJSON(filePath)
}
return Register(componentName, constructor)
}