diff --git a/_examples/basic_async.go b/_examples/basic_async.go new file mode 100644 index 00000000..d33c3052 --- /dev/null +++ b/_examples/basic_async.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/Shopify/zk" +) + +func main() { + c, _, err := zk.Connect([]string{"127.0.0.1:2181", "127.0.0.1:2182", "127.0.0.1:2183"}, time.Second) //*10) + if err != nil { + panic(err) + } + err = c.AddWatchCtxAsync(context.Background(), "/", true, func(ctx context.Context, e zk.Event) { + fmt.Printf("Got event: %+v\n", e) + c.GetCtxAsync(ctx, e.Path, func(ctx context.Context, b []byte, s *zk.Stat, err error) { + if err != nil { + fmt.Printf("%s err => %+v\n", e.Path, err) + } else { + fmt.Printf("%s (%+v): %+v\n", e.Path, s, b) + } + }) + }) + if err != nil { + panic(err) + } + for { + time.Sleep(time.Second) + } +} diff --git a/_examples/tree_walker_async.go b/_examples/tree_walker_async.go new file mode 100644 index 00000000..530fe142 --- /dev/null +++ b/_examples/tree_walker_async.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "fmt" + "path" + "time" + + "github.com/Shopify/zk" +) + +var ( + zkConn = struct{}{} +) + +func walk(ctx context.Context, parent string, childrens []string, stat *zk.Stat, err error) { + if ctx.Err() != nil || err != nil { + fmt.Printf("%s err1 => %+v\n", parent, err) + return + } + c := ctx.Value(zkConn).(*zk.Conn) + + if stat.NumChildren == 0 { + c.GetCtxAsync(ctx, parent, func(ctx context.Context, b []byte, s *zk.Stat, err error) { + if err != nil { + fmt.Printf("%s err2 => %+v\n", parent, err) + } else { + fmt.Printf("Leaf %s => %+v\n", parent, b) + } + }) + } else { + for _, child := range childrens { + c.ChildrenCtxAsync(ctx, path.Join(parent, child), walk) + } + } +} + +func main() { + c, _, err := zk.Connect([]string{"127.0.0.1:2181", "127.0.0.1:2182", "127.0.0.1:2183"}, time.Second) //*10) + if err != nil { + panic(err) + } + + ctx := context.WithValue(context.Background(), zkConn, c) + c.ChildrenCtxAsync(ctx, "/", walk) + for { + time.Sleep(time.Second) + } +} diff --git a/conn.go b/conn.go index 175e2769..3f8e3f81 100644 --- a/conn.go +++ b/conn.go @@ -1086,6 +1086,20 @@ func (c *Conn) AddAuthCtx(ctx context.Context, scheme string, auth []byte) error return nil } +func (c *Conn) AddWatchCtxAsync(ctx context.Context, path string, recursive bool, callback func(context.Context, Event)) error { + ch, err := c.AddWatchCtx(ctx, path, recursive) + if err != nil { + return err + } + + go func() { + for e := range ch { + callback(ctx, e) + } + }() + return nil +} + // AddWatch creates a persistent (optionally recursive) watch at the given path. func (c *Conn) AddWatch(path string, recursive bool) (<-chan Event, error) { return c.AddWatchCtx(context.Background(), path, recursive) @@ -1143,6 +1157,13 @@ func (c *Conn) RemoveWatchCtx(ctx context.Context, ech <-chan Event) error { return nil } +func (c *Conn) ChildrenCtxAsync(ctx context.Context, path string, callback func(context.Context, string, []string, *Stat, error)) { + go func() { + childrens, stat, err := c.ChildrenCtx(ctx, path) + callback(ctx, path, childrens, stat, err) + }() +} + // Children returns the children of a znode. func (c *Conn) Children(path string) ([]string, *Stat, error) { return c.ChildrenCtx(context.Background(), path) @@ -1186,6 +1207,13 @@ func (c *Conn) ChildrenWCtx(ctx context.Context, path string) ([]string, *Stat, return res.Children, &res.Stat, ech, err } +func (c *Conn) GetCtxAsync(ctx context.Context, path string, callback func(context.Context, []byte, *Stat, error)) { + go func() { + data, stat, err := c.GetCtx(ctx, path) + callback(ctx, data, stat, err) + }() +} + // Get gets the contents of a znode. func (c *Conn) Get(path string) ([]byte, *Stat, error) { return c.GetCtx(context.Background(), path)