From 08fc28b38fd69f3b680a3b235f78a24478c836f3 Mon Sep 17 00:00:00 2001 From: Will Rouesnel Date: Fri, 1 Mar 2019 15:17:23 +1100 Subject: [PATCH] Add streaming command support. Add options - `stream-stdout-in-response` - `stream-stdout-in-response-on-error` - `stream-command-kill-grace-period-seconds` to allow defining webhooks which dynamically stream large content back to the requestor. This allows the creation of download endpoints from scripts, i.e. running a `git archive` command or a database dump from a docker container, without needing to buffer up the original. --- docs/Hook-Definition.md | 3 + hook/hook.go | 3 + test/hookecho.go | 61 ++++- test/hooks.json.tmpl | 31 +++ test/hooks.yaml.tmpl | 21 +- test/hookstream/hookstream.go | 107 +++++++++ webhook.go | 419 ++++++++++++++++++++++++++-------- webhook_test.go | 13 +- 8 files changed, 547 insertions(+), 111 deletions(-) create mode 100644 test/hookstream/hookstream.go diff --git a/docs/Hook-Definition.md b/docs/Hook-Definition.md index c248a779..1ecf1ca9 100644 --- a/docs/Hook-Definition.md +++ b/docs/Hook-Definition.md @@ -10,6 +10,9 @@ Hooks are defined as JSON objects. Please note that in order to be considered va * `response-headers` - specifies the list of headers in format `{"name": "X-Example-Header", "value": "it works"}` that will be returned in HTTP response for the hook * `include-command-output-in-response` - boolean whether webhook should wait for the command to finish and return the raw output as a response to the hook initiator. If the command fails to execute or encounters any errors while executing the response will result in 500 Internal Server Error HTTP status code, otherwise the 200 OK status code will be returned. * `include-command-output-in-response-on-error` - boolean whether webhook should include command stdout & stderror as a response in failed executions. It only works if `include-command-output-in-response` is set to `true`. + * `stream-stdout-in-response` - boolean (exclusive with `include-command-output-in-response` and `include-command-output-in-response-on-error`) that will stream the output of a command in the response if the command writes any data to standard output before exiting non-zero. + * `stream-stderr-in-response-on-error` - boolean whether the webhook should send the stream of stderr on error. Only effective if `stream-stdout-in-response` is being used. + * `stream-command-kill-grace-period-seconds` - float number of seconds to wait after trying to kill a stream command with SIGTERM before sending SIGKILL. Default is 0 (do not wait). * `parse-parameters-as-json` - specifies the list of arguments that contain JSON strings. These parameters will be decoded by webhook and you can access them like regular objects in rules and `pass-arguments-to-command`. * `pass-arguments-to-command` - specifies the list of arguments that will be passed to the command. Check [Referencing request values page](Referencing-Request-Values.md) to see how to reference the values from the request. If you want to pass a static string value to your command you can specify it as `{ "source": "string", "name": "argumentvalue" }` diff --git a/hook/hook.go b/hook/hook.go index 721b7313..cfc99789 100644 --- a/hook/hook.go +++ b/hook/hook.go @@ -420,6 +420,9 @@ type Hook struct { ResponseHeaders ResponseHeaders `json:"response-headers,omitempty"` CaptureCommandOutput bool `json:"include-command-output-in-response,omitempty"` CaptureCommandOutputOnError bool `json:"include-command-output-in-response-on-error,omitempty"` + StreamCommandStdout bool `json:"stream-stdout-in-response,omitempty"` + StreamCommandStderrOnError bool `json:"stream-stderr-in-response-on-error,omitempty"` + StreamCommandKillGraceSecs float64 `json:"stream-command-kill-grace-period-seconds,omitempty"` PassEnvironmentToCommand []Argument `json:"pass-environment-to-command,omitempty"` PassArgumentsToCommand []Argument `json:"pass-arguments-to-command,omitempty"` PassFileToCommand []Argument `json:"pass-file-to-command,omitempty"` diff --git a/test/hookecho.go b/test/hookecho.go index 6e5e9f7b..90787017 100644 --- a/test/hookecho.go +++ b/test/hookecho.go @@ -5,13 +5,58 @@ package main import ( "fmt" "os" - "strconv" "strings" + "strconv" + "io" ) +func checkPrefix(prefixMap map[string]struct{}, prefix string, arg string) bool { + if _, found := prefixMap[prefix]; found { + fmt.Printf("prefix specified more then once: %s", arg) + os.Exit(-1) + } + + if strings.HasPrefix(arg, prefix) { + prefixMap[prefix] = struct{}{} + return true + } + + return false +} + func main() { + var outputStream io.Writer + outputStream = os.Stdout + seenPrefixes := make(map[string]struct{}) + exit_code := 0 + + for _, arg := range os.Args[1:] { + if checkPrefix(seenPrefixes, "stream=", arg) { + switch arg { + case "stream=stdout": + outputStream = os.Stdout + case "stream=stderr": + outputStream = os.Stderr + case "stream=both": + outputStream = io.MultiWriter(os.Stdout, os.Stderr) + default: + fmt.Printf("unrecognized stream specification: %s", arg) + os.Exit(-1) + } + } else if checkPrefix(seenPrefixes, "exit=", arg) { + exit_code_str := arg[5:] + var err error + exit_code_conv, err := strconv.Atoi(exit_code_str) + exit_code = exit_code_conv + if err != nil { + fmt.Printf("Exit code %s not an int!", exit_code_str) + os.Exit(-1) + } + } + } + if len(os.Args) > 1 { - fmt.Printf("arg: %s\n", strings.Join(os.Args[1:], " ")) + fmt.Fprintf(outputStream, "arg: %s\n", strings.Join(os.Args[1:], " ")) } var env []string @@ -22,16 +67,8 @@ func main() { } if len(env) > 0 { - fmt.Printf("env: %s\n", strings.Join(env, " ")) + fmt.Fprintf(outputStream, "env: %s\n", strings.Join(env, " ")) } - if (len(os.Args) > 1) && (strings.HasPrefix(os.Args[1], "exit=")) { - exit_code_str := os.Args[1][5:] - exit_code, err := strconv.Atoi(exit_code_str) - if err != nil { - fmt.Printf("Exit code %s not an int!", exit_code_str) - os.Exit(-1) - } - os.Exit(exit_code) - } + os.Exit(exit_code) } diff --git a/test/hooks.json.tmpl b/test/hooks.json.tmpl index 2fd90da6..97923700 100644 --- a/test/hooks.json.tmpl +++ b/test/hooks.json.tmpl @@ -204,5 +204,36 @@ "name": "passed" } ], + }, + { + "id": "stream-stdout-in-response", + "pass-arguments-to-command": [ + { + "source": "string", + "name": "exit=0" + }, + { + "source": "string", + "name": "stream=both" + } + ], + "execute-command": "{{ .Hookecho }}", + "stream-stdout-in-response": true + }, + { + "id": "stream-stderr-in-response-on-error", + "pass-arguments-to-command": [ + { + "source": "string", + "name": "exit=1" + }, + { + "source": "string", + "name": "stream=stderr" + } + ], + "execute-command": "{{ .Hookecho }}", + "stream-stdout-in-response": true, + "stream-stderr-in-response-on-error": true } ] diff --git a/test/hooks.yaml.tmpl b/test/hooks.yaml.tmpl index c2fffcd0..6d6cd3b9 100644 --- a/test/hooks.yaml.tmpl +++ b/test/hooks.yaml.tmpl @@ -113,4 +113,23 @@ - id: warn-on-space execute-command: '{{ .Hookecho }} foo' - include-command-output-in-response: true \ No newline at end of file + include-command-output-in-response: true + +- id: stream-stdout-in-response + execute-command: '{{ .Hookecho }}' + stream-stdout-in-response: true + pass-arguments-to-command: + - source: string + name: exit=0 + - source: string + name: stream=both + +- id: stream-stderr-in-response-on-error + execute-command: '{{ .Hookecho }}' + stream-stdout-in-response: true + stream-stderr-in-response-on-error: true + pass-arguments-to-command: + - source: string + name: exit=1 + - source: string + name: stream=stderr diff --git a/test/hookstream/hookstream.go b/test/hookstream/hookstream.go new file mode 100644 index 00000000..474b39c4 --- /dev/null +++ b/test/hookstream/hookstream.go @@ -0,0 +1,107 @@ +// Hook Stream is a simple utility for testing Webhook streaming capability. It spawns a TCP server on execution +// which echos all connections to its stdout until it receives the string EOF. + +package main + +import ( + "fmt" + "os" + "strings" + "strconv" + "io" + "net" + "bufio" +) + +func checkPrefix(prefixMap map[string]struct{}, prefix string, arg string) bool { + if _, found := prefixMap[prefix]; found { + fmt.Printf("prefix specified more then once: %s", arg) + os.Exit(-1) + } + + if strings.HasPrefix(arg, prefix) { + prefixMap[prefix] = struct{}{} + return true + } + + return false +} + +func main() { + var outputStream io.Writer + outputStream = os.Stdout + seenPrefixes := make(map[string]struct{}) + exit_code := 0 + + for _, arg := range os.Args[1:] { + if checkPrefix(seenPrefixes, "stream=", arg) { + switch arg { + case "stream=stdout": + outputStream = os.Stdout + case "stream=stderr": + outputStream = os.Stderr + case "stream=both": + outputStream = io.MultiWriter(os.Stdout, os.Stderr) + default: + fmt.Printf("unrecognized stream specification: %s", arg) + os.Exit(-1) + } + } else if checkPrefix(seenPrefixes, "exit=", arg) { + exit_code_str := arg[5:] + var err error + exit_code_conv, err := strconv.Atoi(exit_code_str) + exit_code = exit_code_conv + if err != nil { + fmt.Printf("Exit code %s not an int!", exit_code_str) + os.Exit(-1) + } + } + } + + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + fmt.Printf("Error starting tcp server: %v\n", err) + os.Exit(-1) + } + defer l.Close() + + // Emit the address of the server + fmt.Printf("%v\n",l.Addr()) + + manageCh := make(chan struct{}) + + go func() { + for { + conn, err := l.Accept() + if err != nil { + fmt.Printf("Error accepting connection: %v\n", err) + os.Exit(-1) + } + go handleRequest(manageCh, outputStream, conn) + } + }() + + <- manageCh + l.Close() + + os.Exit(exit_code) +} + +// Handles incoming requests. +func handleRequest(manageCh chan<- struct{}, w io.Writer, conn net.Conn) { + defer conn.Close() + bio := bufio.NewScanner(conn) + for bio.Scan() { + if line := strings.TrimSuffix(bio.Text(), "\n"); line == "EOF" { + // Request program close + select { + case manageCh <- struct{}{}: + // Request sent. + default: + // Already closing + } + break + } + fmt.Fprintf(w, "%s\n", bio.Text()) + } +} \ No newline at end of file diff --git a/webhook.go b/webhook.go index 5d410775..2005f0de 100644 --- a/webhook.go +++ b/webhook.go @@ -20,6 +20,11 @@ import ( "github.com/satori/go.uuid" fsnotify "gopkg.in/fsnotify.v1" + "io" + "bytes" + "syscall" + "context" + "bufio" ) const ( @@ -199,131 +204,289 @@ func main() { } -func hookHandler(w http.ResponseWriter, r *http.Request) { +func lineReader(rdr io.Reader, out io.Writer) <-chan struct{} { + done := make(chan struct{}) + go func() { + scanner := bufio.NewScanner(rdr) + for scanner.Scan() { + fmt.Fprintf(out, "%s\n", scanner.Text()) + } + close(done) + }() + return done +} + +// combinedOutput simply reads two streams until they terminate and returns the result as a string. +func combinedOutput(stdout io.Reader, stderr io.Reader) string { + outStream := bytes.NewBuffer(nil) + + stdoutdone := lineReader(stdout, outStream) + stderrdone := lineReader(stderr, outStream) + + // Order doesn't matter here, we just need both to finish + <-stdoutdone + <-stderrdone + + return outStream.String() +} +func hookHandler(w http.ResponseWriter, r *http.Request) { // generate a request id for logging rid := uuid.NewV4().String()[:6] log.Printf("[%s] incoming HTTP request from %s\n", rid, r.RemoteAddr) - for _, responseHeader := range responseHeaders { - w.Header().Set(responseHeader.Name, responseHeader.Value) + id := mux.Vars(r)["id"] + + matchedHook := matchLoadedHook(id) + + // Exit early if no hook matches + if matchedHook == nil { + w.WriteHeader(http.StatusNotFound) + fmt.Fprintf(w, "Hook not found.") + return } - id := mux.Vars(r)["id"] - if matchedHook := matchLoadedHook(id); matchedHook != nil { - log.Printf("[%s] %s got matched\n", rid, id) + log.Printf("[%s] %s got matched\n", rid, id) - body, err := ioutil.ReadAll(r.Body) - if err != nil { - log.Printf("[%s] error reading the request body. %+v\n", rid, err) - } + body, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Printf("[%s] error reading the request body. %+v\n", rid, err) + } - // parse headers - headers := valuesToMap(r.Header) + // parse headers + headers := valuesToMap(r.Header) - // parse query variables - query := valuesToMap(r.URL.Query()) + // parse query variables + query := valuesToMap(r.URL.Query()) - // parse body - var payload map[string]interface{} + // parse body + var payload map[string]interface{} - // set contentType to IncomingPayloadContentType or header value - contentType := r.Header.Get("Content-Type") - if len(matchedHook.IncomingPayloadContentType) != 0 { - contentType = matchedHook.IncomingPayloadContentType - } + contentType := r.Header.Get("Content-Type") - if strings.Contains(contentType, "json") { - decoder := json.NewDecoder(strings.NewReader(string(body))) - decoder.UseNumber() + if strings.Contains(contentType, "json") { + decoder := json.NewDecoder(strings.NewReader(string(body))) + decoder.UseNumber() - err := decoder.Decode(&payload) + err := decoder.Decode(&payload) - if err != nil { - log.Printf("[%s] error parsing JSON payload %+v\n", rid, err) - } - } else if strings.Contains(contentType, "form") { - fd, err := url.ParseQuery(string(body)) - if err != nil { - log.Printf("[%s] error parsing form payload %+v\n", rid, err) - } else { - payload = valuesToMap(fd) - } + if err != nil { + log.Printf("[%s] error parsing JSON payload %+v\n", rid, err) } + } else if strings.Contains(contentType, "form") { + fd, err := url.ParseQuery(string(body)) + if err != nil { + log.Printf("[%s] error parsing form payload %+v\n", rid, err) + } else { + payload = valuesToMap(fd) + } + } - // handle hook - errors := matchedHook.ParseJSONParameters(&headers, &query, &payload) + // handle hook + if errors := matchedHook.ParseJSONParameters(&headers, &query, &payload); errors != nil { for _, err := range errors { log.Printf("[%s] error parsing JSON parameters: %s\n", rid, err) } + } - var ok bool + if matchedHook.TriggerRule != nil { + ok, err := matchedHook.TriggerRule.Evaluate(&headers, &query, &payload, &body, r.RemoteAddr) + if err != nil { + msg := fmt.Sprintf("[%s] error evaluating hook: %s", rid, err) + log.Printf(msg) + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Error occurred while evaluating hook rules.") + return + } - if matchedHook.TriggerRule == nil { - ok = true - } else { - ok, err = matchedHook.TriggerRule.Evaluate(&headers, &query, &payload, &body, r.RemoteAddr) - if err != nil { - msg := fmt.Sprintf("[%s] error evaluating hook: %s", rid, err) - log.Print(msg) - w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, "Error occurred while evaluating hook rules.") - return + // Trigger rules did not evaluate. Handle an error. + if !ok { + // Check if a return code is configured for the hook + if matchedHook.TriggerRuleMismatchHttpResponseCode != 0 { + // Check if the configured return code is supported by the http package + // by testing if there is a StatusText for this code. + if len(http.StatusText(matchedHook.TriggerRuleMismatchHttpResponseCode)) > 0 { + w.WriteHeader(matchedHook.TriggerRuleMismatchHttpResponseCode) + } else { + log.Printf("[%s] %s got matched, but the configured return code %d is unknown - defaulting to 200\n", rid, matchedHook.ID, matchedHook.TriggerRuleMismatchHttpResponseCode) + } } + + // if none of the hooks got triggered + log.Printf("[%s] %s got matched, but didn't get triggered because the trigger rules were not satisfied\n", rid, matchedHook.ID) + + fmt.Fprintf(w, "Hook rules were not satisfied.") + + // Bail. + return } + } - if ok { - log.Printf("[%s] %s hook triggered successfully\n", rid, matchedHook.ID) + // Rule evaluated successfully by this point and will be triggered. - for _, responseHeader := range matchedHook.ResponseHeaders { - w.Header().Set(responseHeader.Name, responseHeader.Value) - } + log.Printf("[%s] %s hook triggered successfully\n", rid, matchedHook.ID) - if matchedHook.CaptureCommandOutput { - response, err := handleHook(matchedHook, rid, &headers, &query, &payload, &body) + // if a regular style webhook, use a background context since we want it to run till it's done. + // if a streaming webhook, we want to enforce it dies if the user disconnects since it's liable to + // block forever otherwise. + var ctx context.Context + if matchedHook.StreamCommandStdout { + ctx = r.Context() + } else { + ctx = context.Background() + } - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - if matchedHook.CaptureCommandOutputOnError { - fmt.Fprintf(w, response) - } else { - w.Header().Set("Content-Type", "text/plain; charset=utf-8") - fmt.Fprintf(w, "Error occurred while executing the hook's command. Please check your logs for more details.") + stdoutRdr, stderrRdr, errCh := handleHook(ctx, matchedHook, rid, &headers, &query, &payload, &body) + + if matchedHook.StreamCommandStdout { + log.Printf("[%s] Hook (%s) is a streaming command hook\n", rid, matchedHook.ID) + // Collect stderr to avoid blocking processes and emit it as a string + stderrRdy := make(chan string, 1) + go func() { + stderrOut := bytes.NewBuffer(nil) + n, err := io.Copy(stderrOut, stderrRdr) + if err != nil { + log.Printf("[%s] Hook error while collecting stderr\n", rid) + } + log.Printf("[%s] Hook logged %d bytes of stderr data\n", rid, n) + stderrStr := stderrOut.String() + log.Printf("[%s] command stderr: %s\n", rid, stderrStr) + stderrRdy <- stderrStr + close(stderrRdy) + }() + + // Streaming output should commence as soon as the command execution tries to write any data + firstByte := make([]byte,1) + _, fbErr := stdoutRdr.Read(firstByte) + if fbErr != nil && fbErr != io.EOF { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "Error occurred while trying to read from the process's first byte. Please check your logs for more details.") + log.Printf("[%s] Hook error while reading first byte: %v\n", rid, err) + return + } else if fbErr == io.EOF { + log.Printf("[%s] EOF from hook stdout while reading first byte. Waiting for program exit status\n", rid) + if err := <- errCh; err != nil { + log.Printf("[%s] Hook (%s) returned an error before the first byte. Collecting stderr and failing.\n", rid, matchedHook.ID) + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusInternalServerError) + if matchedHook.StreamCommandStderrOnError { + // Wait for the stderr buffer to finish collecting + if n, err := fmt.Fprint(w, <- stderrRdy); err != nil { + msg := fmt.Sprintf("[%s] error while writing user response message (after %d bytes): %s", rid, n, err) + log.Printf(msg) + return } } else { - fmt.Fprintf(w, response) + fmt.Fprintf(w, "Error occurred while executing the hooks command. Please check your logs for more details.") } - } else { - go handleHook(matchedHook, rid, &headers, &query, &payload, &body) - fmt.Fprintf(w, matchedHook.ResponseMessage) + return // Cannot proceed beyond here } + // early EOF, but program exited successfully so stream as normal. + } + + // Write user success headers + for _, responseHeader := range matchedHook.ResponseHeaders { + w.Header().Set(responseHeader.Name, responseHeader.Value) + } + + // Got the first byte (or possibly nothing) successfully. Write the success header, then commence + // streaming. + w.WriteHeader(http.StatusOK) + + if _, err := w.Write(firstByte); err != nil { + // Hard fail, client has disconnected or otherwise we can't continue. + msg := fmt.Sprintf("[%s] error while trying to stream first byte: %s", rid, err) + log.Printf(msg) return } - // Check if a return code is configured for the hook - if matchedHook.TriggerRuleMismatchHttpResponseCode != 0 { - // Check if the configured return code is supported by the http package - // by testing if there is a StatusText for this code. - if len(http.StatusText(matchedHook.TriggerRuleMismatchHttpResponseCode)) > 0 { - w.WriteHeader(matchedHook.TriggerRuleMismatchHttpResponseCode) - } else { - log.Printf("[%s] %s got matched, but the configured return code %d is unknown - defaulting to 200\n", rid, matchedHook.ID, matchedHook.TriggerRuleMismatchHttpResponseCode) - } + n, err := io.Copy(w, stdoutRdr) + if err != nil { + msg := fmt.Sprintf("[%s] error while streaming command output (after %d bytes): %s", rid, n, err) + log.Printf(msg) + return } - // if none of the hooks got triggered - log.Printf("[%s] %s got matched, but didn't get triggered because the trigger rules were not satisfied\n", rid, matchedHook.ID) + msg := fmt.Sprintf("[%s] Streamed %d bytes", rid, n) + log.Printf(msg) - fmt.Fprintf(w, "Hook rules were not satisfied.") } else { - w.WriteHeader(http.StatusNotFound) - fmt.Fprintf(w, "Hook not found.") + log.Printf("[%s] Hook (%s) is a conventional command hook\n", rid, matchedHook.ID) + // Don't break the original API and just combine the streams (specifically, kick off two readers which + // break on newlines and the emit that data in temporal order to the output buffer. + out := combinedOutput(stdoutRdr, stderrRdr) + + log.Printf("[%s] command output: %s\n", rid, out) + + err := <-errCh + + log.Printf("[%s] got command execution result: %v", rid, err) + + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + } else { + for _, responseHeader := range matchedHook.ResponseHeaders { + w.Header().Set(responseHeader.Name, responseHeader.Value) + } + w.WriteHeader(http.StatusOK) + } + + if matchedHook.CaptureCommandOutput { + if matchedHook.CaptureCommandOutputOnError || err == nil { + // Send output if send output on error or no error + if n, err := fmt.Fprint(w, out); err != nil { + msg := fmt.Sprintf("[%s] error while writing command output (after %d bytes): %s", rid, n, err) + log.Printf(msg) + return + } + } else if !matchedHook.CaptureCommandOutputOnError && err != nil { + // Have an error but not allowed to send output - send error message + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + if n, err := fmt.Fprintf(w, "Error occurred while executing the hook's command. Please check your logs for more details."); err != nil { + msg := fmt.Sprintf("[%s] error while writing error message (after %d bytes): %s", rid, n, err) + log.Printf(msg) + return + } + } + } else { + // Not capturing command output + if err != nil { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + if n, err := fmt.Fprintf(w, "Error occurred while executing the hook's command. Please check your logs for more details."); err != nil { + msg := fmt.Sprintf("[%s] error while writing user response message (after %d bytes): %s", rid, n, err) + log.Printf(msg) + return + } + } else { + // Ignore all command output and send the response message + if n, err := fmt.Fprint(w, matchedHook.ResponseMessage); err != nil { + msg := fmt.Sprintf("[%s] error while writing user response message (after %d bytes): %s", rid, n, err) + log.Printf(msg) + return + } + } + } } } -func handleHook(h *hook.Hook, rid string, headers, query, payload *map[string]interface{}, body *[]byte) (string, error) { +// errDispatch is a helper to non-blockingly send a single error to a waiting channel and then close it +func errDispatch(err error) <-chan error { + errCh := make(chan error) + go func() { + errCh <- err + close(errCh) + }() + return errCh +} + +// handleHook sets up and start the hook command, returning readers for stdout and stderr, +// a channel to return the command result on +func handleHook(ctx context.Context, h *hook.Hook, rid string, headers, query, payload *map[string]interface{}, +body *[]byte) (io.Reader, io.Reader, <-chan error) { + var errors []error // check the command exists @@ -337,7 +500,7 @@ func handleHook(h *hook.Hook, rid string, headers, query, payload *map[string]in log.Printf("use 'pass-arguments-to-command' to specify args for '%s'", s) } - return "", err + return bytes.NewBufferString(""), bytes.NewBufferString(""), errDispatch(err) } cmd := exec.Command(cmdPath) @@ -348,6 +511,10 @@ func handleHook(h *hook.Hook, rid string, headers, query, payload *map[string]in log.Printf("[%s] error extracting command arguments: %s\n", rid, err) } + for _, err := range errors { + log.Printf("[%s] error setting up command pipes: %s\n", rid, err) + } + var envs []string envs, errors = h.ExtractCommandArgumentsForEnv(headers, query, payload) @@ -383,29 +550,89 @@ func handleHook(h *hook.Hook, rid string, headers, query, payload *map[string]in cmd.Env = append(os.Environ(), envs...) - log.Printf("[%s] executing %s (%s) with arguments %q and environment %s using %s as cwd\n", rid, h.ExecuteCommand, cmd.Path, cmd.Args, envs, cmd.Dir) + // Setup stdout and stderr pipe + stdout, err := cmd.StdoutPipe() + if err != nil { + return bytes.NewBufferString(""), bytes.NewBufferString(""), errDispatch(err) + } - out, err := cmd.CombinedOutput() + stderr, err := cmd.StderrPipe() + if err != nil { + return bytes.NewBufferString(""), bytes.NewBufferString(""), errDispatch(err) + } - log.Printf("[%s] command output: %s\n", rid, out) + log.Printf("[%s] executing %s (%s) with arguments %q and environment %s using %s as cwd\n", rid, h.ExecuteCommand, cmd.Path, cmd.Args, envs, cmd.Dir) - if err != nil { - log.Printf("[%s] error occurred: %+v\n", rid, err) + // Attempt to start the command... + if err := cmd.Start(); err != nil { + log.Printf("[%s] error occurred on command start: %+v\n", rid, err) + return bytes.NewBufferString(""), bytes.NewBufferString(""), errDispatch(err) } - for i := range files { - if files[i].File != nil { - log.Printf("[%s] removing file %s\n", rid, files[i].File.Name()) - err := os.Remove(files[i].File.Name()) - if err != nil { - log.Printf("[%s] error removing file %s [%s]", rid, files[i].File.Name(), err) + // From this point on we need to actually wait to emit the error + errCh := make(chan error) + doneCh := make(chan struct{}) + + // Spawn a goroutine to wait for the command to end supply errors + go func() { + resultErr := cmd.Wait() + close(doneCh) // Close the doneCh immediately so handlers exit correctly. + if resultErr != nil { + log.Printf("[%s] error occurred: %+v\n", rid, resultErr) + } + + for i := range files { + if files[i].File != nil { + log.Printf("[%s] removing file %s\n", rid, files[i].File.Name()) + err := os.Remove(files[i].File.Name()) + if err != nil { + log.Printf("[%s] error removing file %s [%s]", rid, files[i].File.Name(), err) + } } } - } - log.Printf("[%s] finished handling %s\n", rid, h.ID) + log.Printf("[%s] finished handling: %s\n", rid, h.ID) + + errCh <- resultErr + close(errCh) + }() + + // Spawn a goroutine which checks if the context is ever cancelled, and sends SIGTERM / SIGKILL if it is + go func() { + ctxDone := ctx.Done() + + select { + case <- ctxDone: + log.Printf("[%s] Context done (request finished) - killing process.", rid) + // AFAIK this works on Win/Mac/Unix - where does it not work? + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + log.Printf("[%s] error sending SIGTERM to process for %s: %s\n", rid, h.ID, err) + } else { + log.Printf("[%s] Context cancelled sending SIGTERM to process for %s\n", rid, h.ID) + } + case <- doneCh: + // Process has exited, this isn't needed anymore. + return + } + + // Process may still be alive, so wait the grace period and then send SIGKILL. + select { + case <- doneCh: + // Process exited after timeout - nothing to do. + return + case <- time.After( time.Duration(float64(time.Second) * h.StreamCommandKillGraceSecs) ): + // Timeout beat process exit. Send kill! + log.Printf("[%s] Sending SIGKILL to process for %s after grace period of %f seconds\n", rid, h.ID, h.StreamCommandKillGraceSecs) + if err := cmd.Process.Kill(); err != nil { + log.Printf("[%s] error sending SIGKILL to process for %s: %s\n", rid, h.ID, err) + } else { + log.Printf("[%s] Sent SIGKILL to process for %s\n", rid, h.ID) + } + } + // Nothing left to do. Everything should be dead now. + }() - return string(out), err + return stdout, stderr, errCh } func reloadHooks(hooksFilePath string) { diff --git a/webhook_test.go b/webhook_test.go index 6113b9b6..1a480323 100644 --- a/webhook_test.go +++ b/webhook_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/adnanh/webhook/hook" + "context" ) func TestStaticParams(t *testing.T) { @@ -53,11 +54,15 @@ func TestStaticParams(t *testing.T) { b := &bytes.Buffer{} log.SetOutput(b) - _, err = handleHook(spHook, "test", &spHeaders, &map[string]interface{}{}, &map[string]interface{}{}, &[]byte{}) + stdout, stderr, errCh := handleHook(context.Background(), spHook, "test", &spHeaders, &map[string]interface{}{}, &map[string]interface{}{}, &[]byte{}) + + combinedOutput(stdout, stderr) + err = <- errCh + if err != nil { t.Fatalf("Unexpected error: %v\n", err) } - matched, _ := regexp.MatchString("(?s)command output: .*static-params-name-space", b.String()) + matched, _ := regexp.MatchString("(?s)finished handling: .*static-params-name-space", b.String()) if !matched { t.Fatalf("Unexpected log output:\n%sn", b) } @@ -618,6 +623,10 @@ env: HOOK_head_commit.timestamp=2013-03-12T08:14:29-07:00 // Check logs {"static params should pass", "static-params-ok", nil, `{}`, false, http.StatusOK, "arg: passed\n", `(?s)command output: arg: passed`}, {"command with space logs warning", "warn-on-space", nil, `{}`, false, http.StatusInternalServerError, "Error occurred while executing the hook's command. Please check your logs for more details.", `(?s)unable to locate command.*use 'pass[-]arguments[-]to[-]command' to specify args`}, + + // Streaming commands + {"streaming response yields stdout only", "stream-stdout-in-response", nil, `{}`, false, http.StatusOK, "arg: exit=0 stream=both\n",``}, + {"streaming response yields stdout only", "stream-stdout-in-response", nil, `{}`, false, http.StatusOK, "arg: exit=0 stream=both\n",``}, } // buffer provides a concurrency-safe bytes.Buffer to tests above.