Skip to content

Commit

Permalink
Forced kill of sub-processes if they dont die after a few seconds.
Browse files Browse the repository at this point in the history
  • Loading branch information
janpfeifer committed Apr 6, 2024
1 parent d1473d4 commit a1788d1
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 10 deletions.
4 changes: 3 additions & 1 deletion docs/VSCode.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ go run . --install --logtostderr --vmodule=goexec=2,specialcmd=2,cellmagic=2,gop

## Links of interest

* [VSCode Jupyter Notebooks](https://code.visualstudio.com/docs/datascience/jupyter-notebooks)
* [VSCode Jupyter Notebooks](https://code.visualstudio.com/docs/datascience/jupyter-notebooks)
* [Renderers for Jupyter Notebooks in Visual Studio Code](https://github.com/Microsoft/vscode-notebook-renderers):
presumably adds renderes to several specialized mime-types, including a specialized plotly mime type, that one could take advantage of. I haven't tried it.
28 changes: 28 additions & 0 deletions internal/jpyexec/jpyexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/pkg/errors"
"io"
"k8s.io/klog/v2"
"os"
osexec "os/exec"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -154,6 +156,9 @@ func (exec *Executor) WithStaticInput(stdinContent []byte) *Executor {
return exec
}

// WaitToKill is the to wait after an interrupt signal, before killing the process.
var WaitToKill = 5 * time.Second

// Exec executes the configured New configuration.
//
// It returns an error if it failed to execute or created the pipes -- but not if the executed
Expand Down Expand Up @@ -231,6 +236,26 @@ func (exec *Executor) Exec() error {
return errors.WithMessagef(err, "failed to start to execute command %q", exec.command)
}

var interruptId kernel.SubscriptionId
interruptId = exec.Msg.Kernel().SubscribeInterrupt(func(id kernel.SubscriptionId) {
// Sent interrupt to process.
err := cmd.Process.Signal(os.Interrupt)
exec.Msg.Kernel().UnsubscribeInterrupt(interruptId)
if err != nil {
klog.Errorf("failed to interrupt process %s (%v): %+v", cmd, cmd.Process, err)
}
select {
case <-exec.doneChan:
// Normal stop, nothing to do.
case <-time.After(WaitToKill):
// If process hasn't yet died, kill it.
err = cmd.Process.Signal(syscall.SIGKILL)
if err != nil {
klog.Errorf("failed to kill process %s (%v): %+v", cmd, cmd.Process, err)
}
}
})

if exec.stdinContent != nil {
exec.handleStaticInput()
}
Expand All @@ -245,6 +270,9 @@ func (exec *Executor) Exec() error {
_ = kernel.PublishWriteStream(exec.Msg, kernel.StreamStderr, errMsg)
}

// Unsubscribe from interruption messages.
exec.Msg.Kernel().UnsubscribeInterrupt(interruptId)

klog.V(2).Infof("Execution finished successfully")
// Notice some of the cleanup will happen in parallel after return,
// triggered by the deferred exec.done() that gets executed.
Expand Down
74 changes: 65 additions & 9 deletions internal/kernel/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package kernel

import (
"container/list"
"context"
"crypto/hmac"
"crypto/sha256"
Expand Down Expand Up @@ -103,9 +104,13 @@ type Kernel struct {
// Channel where signals are received.
signalsChan chan os.Signal

// Interrupted indicates whether shell currently being executed was Interrupted.
// Interrupted indicates whether cell/shell currently being executed was Interrupted.
Interrupted atomic.Bool

// InterruptCond gets signaled whenever an interruption happens.
interruptSubscriptions *list.List
muSubscriptions sync.Mutex

// stdinMsg holds the MessageImpl that last asked from input from stdin (MessageImpl.PromptInput).
stdinMsg *MessageImpl
stdinFn OnInputFn // Callback when stdin input is received.
Expand Down Expand Up @@ -189,14 +194,15 @@ func (k *Kernel) HandleInterrupt() {
select {
case sig := <-k.signalsChan:
k.Interrupted.Store(true)
sigName := sig.String()
klog.Infof("Signal %s received.", sigName)
switch sigName {
case "terminated", "hangup":
// These signals should stop the kernel.
klog.Errorf("Signal %s triggers kernel stop.", sigName)
k.Stop()
k.callInterruptSubscribers()
klog.Infof("Signal %s received.", sig)
if sig == os.Interrupt {
// Simply interrupt running cells.
continue
}
// Otherwise stop kernel.
klog.Errorf("Signal %s triggers kernel stop.", sig)
k.Stop()
case <-k.stop:
return // kernel stopped.
}
Expand All @@ -205,6 +211,55 @@ func (k *Kernel) HandleInterrupt() {
}
}

// SubscriptionId is returned by [Kernel.SubscribeInterrupt], and can be used by [Kernel.UnsubscribeInterrupt].
type SubscriptionId *list.Element

// InterruptFn is called on its own goroutine.
type InterruptFn func(id SubscriptionId)

// SubscribeInterrupt registers `fn` to be called if any interruptions occur.
// It returns a [SubscriptionId] that needs to be used to unsubscribe to it later.
func (k *Kernel) SubscribeInterrupt(fn InterruptFn) SubscriptionId {
k.muSubscriptions.Lock()
defer k.muSubscriptions.Unlock()
if klog.V(2).Enabled() {
klog.Infof("SubscribeInterrupt(): %d elements", k.interruptSubscriptions.Len()+1)
}
return k.interruptSubscriptions.PushBack(fn)
}

// UnsubscribeInterrupt stops being called back in case of interruptions.
// It takes the `id` returned by [SubscribeInterrupt].
func (k *Kernel) UnsubscribeInterrupt(id SubscriptionId) {
k.muSubscriptions.Lock()
defer k.muSubscriptions.Unlock()

if id.Value == nil {
// Already unsubscribed.
return
}
id.Value = nil
k.interruptSubscriptions.Remove(id)
if klog.V(2).Enabled() {
klog.Infof("UnsubscribeInterrupt(): %d elements left", k.interruptSubscriptions.Len())
}
}

// callInterruptSubscribers in a separate goroutine each.
// Meant to be called when JupyterServer sends a kernel interrupt (either a SIGINT, or a message to interrupt).
func (k *Kernel) callInterruptSubscribers() {
k.muSubscriptions.Lock()
defer k.muSubscriptions.Unlock()

for e := k.interruptSubscriptions.Front(); e != nil; e = e.Next() {
if e.Value == nil {
continue
}
fn := e.Value.(InterruptFn)
go fn(e) // run on separate goroutine.
}
}

// ExitWait will wait for the kernel to be stopped and all polling
// goroutines to finish.
func (k *Kernel) ExitWait() {
Expand Down Expand Up @@ -272,7 +327,8 @@ func New(connectionFile string) (*Kernel, error) {
stdin: make(chan Message, 1),
control: make(chan Message, 1),

KnownBlockIds: make(common.Set[string]),
interruptSubscriptions: list.New(),
KnownBlockIds: make(common.Set[string]),
}

if matches := reExtractJupyterSessionId.FindStringSubmatch(connectionFile); len(matches) == 2 {
Expand Down

0 comments on commit a1788d1

Please sign in to comment.