Skip to content

Commit

Permalink
refactor context
Browse files Browse the repository at this point in the history
  • Loading branch information
dbarrosop committed Jun 3, 2019
1 parent 6164529 commit ab8fd5a
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 77 deletions.
9 changes: 5 additions & 4 deletions examples/3_grouped_simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package main

import (
"context"
"os"
"sync"

Expand All @@ -18,9 +19,9 @@ import (
type checkMemoryAndCPU struct {
}

func (c checkMemoryAndCPU) Run(ctx gornir.Context, wg *sync.WaitGroup, jobResult chan *gornir.JobResult) {
func (c *checkMemoryAndCPU) Run(ctx context.Context, wg *sync.WaitGroup, taskParameters *gornir.TaskParameters, jobResult chan *gornir.JobResult) {
// We instantiate a new object
result := gornir.NewJobResult(ctx)
result := gornir.NewJobResult(ctx, taskParameters)

defer wg.Done() // flag as completed

Expand All @@ -32,11 +33,11 @@ func (c checkMemoryAndCPU) Run(ctx gornir.Context, wg *sync.WaitGroup, jobResult
swg.Add(2)

// We call the first subtask and store the subresult
(&task.RemoteCommand{Command: "free -m"}).Run(ctx, swg, sr)
(&task.RemoteCommand{Command: "free -m"}).Run(ctx, swg, taskParameters, sr)
result.AddSubResult(<-sr)

// We call the second subtask and store the subresult
(&task.RemoteCommand{Command: "uptime"}).Run(ctx, swg, sr)
(&task.RemoteCommand{Command: "uptime"}).Run(ctx, swg, taskParameters, sr)
result.AddSubResult(<-sr)

jobResult <- result
Expand Down
51 changes: 13 additions & 38 deletions pkg/gornir/context.go
Original file line number Diff line number Diff line change
@@ -1,93 +1,68 @@
package gornir

import (
"context"
"runtime"
"time"

"github.com/google/uuid"
)

// Context implements the context.Context interface and enriches it
// Params implements the context.Params interface and enriches it
// with extra useful information. You will find this object mosty
// in two places; in the JobResult and in objects implementing
// the Task interface
type Context struct {
ctx context.Context
type Params struct {
title string
id string
gr *Gornir
logger Logger
host *Host
}

// NewContext returns a new Context
func NewContext(ctx context.Context, title string, gr *Gornir, logger Logger) Context {
// NewParams returns a new Params
func NewParams(title string, gr *Gornir, logger Logger) Params {
id := uuid.New().String()
return Context{
return Params{
id: id,
gr: gr,
ctx: ctx,
logger: logger.WithField("ID", id),
title: title,
}
}

// ForHost returns a copy of the Context adding the Host to it
func (c Context) ForHost(host *Host) Context {
return Context{
// ForHost returns a copy of the Params adding the Host to it
func (c Params) ForHost(host *Host) Params {
return Params{
id: c.id,
gr: c.gr,
ctx: c.ctx,
logger: c.logger,
title: c.title,
host: host,
}
}

// Title returns the title of the task
func (c Context) Title() string {
func (c Params) Title() string {
return c.title
}

// Gornir returns the Gornir object that triggered the execution of the task
func (c Context) Gornir() *Gornir {
func (c Params) Gornir() *Gornir {
return c.gr
}

// Host returns the Host associated with the context
func (c Context) Host() *Host {
func (c Params) Host() *Host {
return c.host
}

// Deadline delegates the method to the underlying context pass upon creation
func (c Context) Deadline() (time.Time, bool) {
return c.ctx.Deadline()
}

// Done delegates the method to the underlying context pass upon creation
func (c Context) Done() <-chan struct{} {
return c.ctx.Done()
}

// Value delegates the method to the underlying context pass upon creation
func (c Context) Value(key interface{}) interface{} {
return c.ctx.Value(key)
}

// Err will return the error returned by a task. Otherwise it will be nil
func (c Context) Err() error {
return c.ctx.Err()
}

// ID returns the unique ID associated with the execution. All hosts
// will share the same ID for a given Run
func (c Context) ID() string {
func (c Params) ID() string {
return c.id
}

// Logger returns a ready to use Logger
func (c Context) Logger() Logger {
func (c Params) Logger() Logger {
return c.logger.WithField("ID", c.id).WithField("funcName", getFrame(1).Function)
}

Expand Down
26 changes: 18 additions & 8 deletions pkg/gornir/gornir.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ type Gornir struct {

// Filter filters the hosts in the inventory returning a copy of the current
// Gornir instance but with only the hosts that passed the filter
func (g *Gornir) Filter(f FilterFunc) *Gornir {
func (gr *Gornir) Filter(f FilterFunc) *Gornir {
return &Gornir{
Inventory: g.Inventory.Filter(g, f),
Logger: g.Logger,
Inventory: gr.Inventory.Filter(gr, f),
Logger: gr.Logger,
}
}

// RunS will execute the task over the hosts in the inventory using the given runner.
// This function will block until all the tasks are completed.
func (g *Gornir) RunS(title string, runner Runner, task Task) (chan *JobResult, error) {
results := make(chan *JobResult, len(g.Inventory.Hosts))
func (gr *Gornir) RunS(title string, runner Runner, task Task) (chan *JobResult, error) {
results := make(chan *JobResult, len(gr.Inventory.Hosts))
err := runner.Run(
NewContext(context.Background(), title, g, g.Logger),
context.Background(),
task,
&TaskParameters{
Title: title,
Gornir: gr,
Logger: gr.Logger,
},
results,
)
if err != nil {
Expand All @@ -44,10 +49,15 @@ func (g *Gornir) RunS(title string, runner Runner, task Task) (chan *JobResult,
// RunA will execute the task over the hosts in the inventory using the given runner.
// This function doesn't block, the user can use the method Runnner.Wait instead.
// It's also up to the user to ennsure the channel is closed
func (g *Gornir) RunA(title string, runner Runner, task Task, results chan *JobResult) error {
func (gr *Gornir) RunA(title string, runner Runner, task Task, results chan *JobResult) error {
err := runner.Run(
NewContext(context.Background(), title, g, g.Logger),
context.Background(), // TODO pass this?
task,
&TaskParameters{
Title: title,
Gornir: gr,
Logger: gr.Logger,
},
results,
)
if err != nil {
Expand Down
51 changes: 38 additions & 13 deletions pkg/gornir/runner.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,68 @@
package gornir

import (
"context"
"sync"
)

type TaskParameters struct {
Title string
Gornir *Gornir
Logger Logger
Host *Host
}

func (t *TaskParameters) ForHost(host *Host) *TaskParameters {
return &TaskParameters{
Title: t.Title,
Gornir: t.Gornir,
Logger: t.Logger,
Host: host,
}
}

// Task is the interface that task plugins need to implement.
// the task is responsible to indicate its completion
// by calling sync.WaitGroup.Done()
type Task interface {
Run(Context, *sync.WaitGroup, chan *JobResult)
Run(context.Context, *sync.WaitGroup, *TaskParameters, chan *JobResult)
}

// Runner is the interface of a struct that can implement a strategy
// to run tasks over hosts
type Runner interface {
Run(Context, Task, chan *JobResult) error // Run executes the task over the hosts
Close() error // Close closes and cleans all objects associated with the runner
Wait() error // Wait blocks until all the hosts are done executing the task
Run(context.Context, Task, *TaskParameters, chan *JobResult) error // Run executes the task over the hosts
Close() error // Close closes and cleans all objects associated with the runner
Wait() error // Wait blocks until all the hosts are done executing the task
}

// JobResult is the result of running a task over a host.
type JobResult struct {
ctx Context
err error
changed bool
data interface{}
subResults []*JobResult
ctx context.Context
taskParameters *TaskParameters
err error
changed bool
data interface{}
subResults []*JobResult
}

// NewJobResult instantiates a new JobResult
func NewJobResult(ctx Context) *JobResult {
return &JobResult{ctx: ctx}
func NewJobResult(ctx context.Context, taskParameters *TaskParameters) *JobResult {
return &JobResult{
ctx: ctx,
taskParameters: taskParameters,
}
}

// Context returns the context associated with the task
func (r *JobResult) Context() Context {
func (r *JobResult) Context() context.Context {
return r.ctx
}

func (r *JobResult) TaskParameters() *TaskParameters {
return r.taskParameters
}

// Err returns the error the task set, otherwise nil
func (r *JobResult) Err() error {
return r.err
Expand All @@ -60,7 +85,7 @@ func (r *JobResult) AnyErr() error {
// SetErr stores the error and also propagates it to the associated Host
func (r *JobResult) SetErr(err error) {
r.err = err
r.Context().Host().setErr(err)
r.TaskParameters().Host.setErr(err)
}

// Changed will return whether the task changed something or not
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/output/text.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func renderResult(wr io.Writer, result *gornir.JobResult, renderHost bool, color
default:
colorFunc = yellow
}
if _, err := wr.Write([]byte(colorFunc(fmt.Sprintf("@ %s\n", result.Context().Host().Hostname), color))); err != nil {
if _, err := wr.Write([]byte(colorFunc(fmt.Sprintf("@ %s\n", result.TaskParameters().Host.Hostname), color))); err != nil {
return err
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func renderResult(wr io.Writer, result *gornir.JobResult, renderHost bool, color
func RenderResults(wr io.Writer, results chan *gornir.JobResult, color bool) error {
r := <-results

title := blue(fmt.Sprintf("# %s\n", r.Context().Title()), color)
title := blue(fmt.Sprintf("# %s\n", r.TaskParameters().Title), color)
if _, err := wr.Write([]byte(title)); err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/plugins/runner/parallel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runner

import (
"context"
"sync"

"github.com/nornir-automation/gornir/pkg/gornir"
Expand All @@ -17,11 +18,11 @@ func Parallel() *ParallelRunner {
}
}

func (r ParallelRunner) Run(ctx gornir.Context, task gornir.Task, results chan *gornir.JobResult) error {
logger := ctx.Logger().WithField("runFunc", getFunctionName(task))
func (r ParallelRunner) Run(ctx context.Context, task gornir.Task, taskParameters *gornir.TaskParameters, results chan *gornir.JobResult) error {
logger := taskParameters.Logger.WithField("runFunc", getFunctionName(task))
logger.Debug("starting runner")

gr := ctx.Gornir()
gr := taskParameters.Gornir
if len(gr.Inventory.Hosts) == 0 {
logger.Warn("no hosts to run against")
return nil
Expand All @@ -30,7 +31,7 @@ func (r ParallelRunner) Run(ctx gornir.Context, task gornir.Task, results chan *

for hostname, host := range gr.Inventory.Hosts {
logger.WithField("host", hostname).Debug("calling function")
go task.Run(ctx.ForHost(host), r.wg, results)
go task.Run(ctx, r.wg, taskParameters.ForHost(host), results)
}
return nil
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/plugins/runner/serial.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package runner

import (
"context"
"sort"
"sync"

Expand All @@ -16,11 +17,11 @@ func Sorted() *SortedRunner {
return &SortedRunner{}
}

func (r SortedRunner) Run(ctx gornir.Context, task gornir.Task, results chan *gornir.JobResult) error {
logger := ctx.Logger().WithField("runFunc", getFunctionName(task))
func (r SortedRunner) Run(ctx context.Context, task gornir.Task, taskParameters *gornir.TaskParameters, results chan *gornir.JobResult) error {
logger := taskParameters.Logger.WithField("runFunc", getFunctionName(task))
logger.Debug("starting runner")

gr := ctx.Gornir()
gr := taskParameters.Gornir
if len(gr.Inventory.Hosts) == 0 {
logger.Warn("no hosts to run against")
return nil
Expand All @@ -37,9 +38,9 @@ func (r SortedRunner) Run(ctx gornir.Context, task gornir.Task, results chan *go
sort.Slice(sortedHostnames, func(i, j int) bool { return sortedHostnames[i] < sortedHostnames[j] })

for _, hostname := range sortedHostnames {
host := ctx.Gornir().Inventory.Hosts[hostname]
host := gr.Inventory.Hosts[hostname]
logger.WithField("host", hostname).Debug("calling function")
task.Run(ctx.ForHost(host), wg, results)
task.Run(ctx, wg, taskParameters.ForHost(host), results)
}
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/plugins/task/ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package task

import (
"bytes"
"context"
"fmt"
"sync"

Expand All @@ -22,10 +23,10 @@ type RemoteCommandResults struct {
Stderr []byte // Stderr written by the command
}

func (r *RemoteCommand) Run(ctx gornir.Context, wg *sync.WaitGroup, jobResult chan *gornir.JobResult) {
func (r *RemoteCommand) Run(ctx context.Context, wg *sync.WaitGroup, taskParameters *gornir.TaskParameters, jobResult chan *gornir.JobResult) {
defer wg.Done()
result := gornir.NewJobResult(ctx)
host := ctx.Host()
host := taskParameters.Host
result := gornir.NewJobResult(ctx, taskParameters)

sshConfig := &ssh.ClientConfig{
User: host.Username,
Expand Down

0 comments on commit ab8fd5a

Please sign in to comment.