Skip to content

Commit

Permalink
Merge pull request #1465 from dcantah/hpc-storagefix
Browse files Browse the repository at this point in the history
[release/0.9] Backport hostprocess fixes
  • Loading branch information
dcantah authored Jul 22, 2022
2 parents 50b68e6 + 86ef5a6 commit e6107b7
Show file tree
Hide file tree
Showing 20 changed files with 400 additions and 216 deletions.
28 changes: 14 additions & 14 deletions internal/jobcontainers/jobcontainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ func splitArgs(cmdLine string) []string {
return r.FindAllString(cmdLine, -1)
}

// Convert environment map to a slice of environment variables in the form [Key1=val1, key2=val2]
func envMapToSlice(m map[string]string) []string {
var s []string
for k, v := range m {
s = append(s, k+"="+v)
}
return s
}

const (
jobContainerNameFmt = "JobContainer_%s"
// Environment variable set in every process in the job detailing where the containers volume
Expand Down Expand Up @@ -108,8 +99,9 @@ func Create(ctx context.Context, id string, s *specs.Spec) (_ cow.Container, _ *

// Create the job object all processes will run in.
options := &jobobject.Options{
Name: fmt.Sprintf(jobContainerNameFmt, id),
Notifications: true,
Name: fmt.Sprintf(jobContainerNameFmt, id),
Notifications: true,
EnableIOTracking: true,
}
job, err := jobobject.Create(ctx, options)
if err != nil {
Expand Down Expand Up @@ -232,7 +224,15 @@ func (c *JobContainer) CreateProcess(ctx context.Context, config interface{}) (_
if err != nil {
return nil, errors.Wrap(err, "failed to get default environment block")
}
env = append(env, envMapToSlice(conf.Environment)...)

// Convert environment map to a slice of environment variables in the form [Key1=val1, key2=val2]
var envs []string
for k, v := range conf.Environment {
expanded, _ := c.replaceWithMountPoint(v)
envs = append(envs, k+"="+expanded)
}
env = append(env, envs...)

env = append(env, sandboxMountPointEnvVar+"="+c.sandboxMount)

// exec.Cmd internally does its own path resolution and as part of this checks some well known file extensions on the file given (e.g. if
Expand Down Expand Up @@ -574,7 +574,7 @@ func systemProcessInformation() ([]*winapi.SYSTEM_PROCESS_INFORMATION, error) {
systemProcInfo = (*winapi.SYSTEM_PROCESS_INFORMATION)(unsafe.Pointer(&b[0]))
status := winapi.NtQuerySystemInformation(
winapi.SystemProcessInformation,
uintptr(unsafe.Pointer(systemProcInfo)),
unsafe.Pointer(systemProcInfo),
size,
&size,
)
Expand Down Expand Up @@ -603,7 +603,7 @@ func systemProcessInformation() ([]*winapi.SYSTEM_PROCESS_INFORMATION, error) {
return procInfos, nil
}

// Takes a string and replaces any occurences of CONTAINER_SANDBOX_MOUNT_POINT with where the containers volume is mounted, as well as returning
// Takes a string and replaces any occurrences of CONTAINER_SANDBOX_MOUNT_POINT with where the containers' volume is mounted, as well as returning
// if the string actually contained the environment variable.
func (c *JobContainer) replaceWithMountPoint(str string) (string, bool) {
newStr := strings.ReplaceAll(str, "%"+sandboxMountPointEnvVar+"%", c.sandboxMount[:len(c.sandboxMount)-1])
Expand Down
2 changes: 1 addition & 1 deletion internal/jobobject/iocp.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func pollIOCP(ctx context.Context, iocpHandle windows.Handle) {
}).Warn("failed to parse job object message")
continue
}
if err := msq.Write(notification); err == queue.ErrQueueClosed {
if err := msq.Enqueue(notification); err == queue.ErrQueueClosed {
// Write will only return an error when the queue is closed.
// The only time a queue would ever be closed is when we call `Close` on
// the job it belongs to which also removes it from the jobMap, so something
Expand Down
55 changes: 47 additions & 8 deletions internal/jobobject/jobobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Options struct {
// `UseNTVariant` specifies if we should use the `Nt` variant of Open/CreateJobObject.
// Defaults to false.
UseNTVariant bool
// `IOTracking` enables tracking I/O statistics on the job object. More specifically this
// calls SetInformationJobObject with the JobObjectIoAttribution class.
EnableIOTracking bool
}

// Create creates a job object.
Expand Down Expand Up @@ -134,6 +137,12 @@ func Create(ctx context.Context, options *Options) (_ *JobObject, err error) {
job.mq = mq
}

if options.EnableIOTracking {
if err := enableIOTracking(jobHandle); err != nil {
return nil, err
}
}

return job, nil
}

Expand Down Expand Up @@ -235,7 +244,7 @@ func (job *JobObject) PollNotification() (interface{}, error) {
if job.mq == nil {
return nil, ErrNotRegistered
}
return job.mq.ReadOrWait()
return job.mq.Dequeue()
}

// UpdateProcThreadAttribute updates the passed in ProcThreadAttributeList to contain what is necessary to
Expand Down Expand Up @@ -330,7 +339,7 @@ func (job *JobObject) Pids() ([]uint32, error) {
err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectBasicProcessIdList,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
)
Expand All @@ -356,7 +365,7 @@ func (job *JobObject) Pids() ([]uint32, error) {
if err = winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectBasicProcessIdList,
uintptr(unsafe.Pointer(&buf[0])),
unsafe.Pointer(&buf[0]),
uint32(len(buf)),
nil,
); err != nil {
Expand Down Expand Up @@ -384,7 +393,7 @@ func (job *JobObject) QueryMemoryStats() (*winapi.JOBOBJECT_MEMORY_USAGE_INFORMA
if err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectMemoryUsageInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
Expand All @@ -406,7 +415,7 @@ func (job *JobObject) QueryProcessorStats() (*winapi.JOBOBJECT_BASIC_ACCOUNTING_
if err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectBasicAccountingInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
Expand All @@ -415,7 +424,9 @@ func (job *JobObject) QueryProcessorStats() (*winapi.JOBOBJECT_BASIC_ACCOUNTING_
return &info, nil
}

// QueryStorageStats gets the storage (I/O) stats for the job object.
// QueryStorageStats gets the storage (I/O) stats for the job object. This call will error
// if either `EnableIOTracking` wasn't set to true on creation of the job, or SetIOTracking()
// hasn't been called since creation of the job.
func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION, error) {
job.handleLock.RLock()
defer job.handleLock.RUnlock()
Expand All @@ -430,7 +441,7 @@ func (job *JobObject) QueryStorageStats() (*winapi.JOBOBJECT_IO_ATTRIBUTION_INFO
if err := winapi.QueryInformationJobObject(
job.handle,
winapi.JobObjectIoAttribution,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
Expand Down Expand Up @@ -476,7 +487,7 @@ func (job *JobObject) QueryPrivateWorkingSet() (uint64, error) {
status := winapi.NtQueryInformationProcess(
h,
winapi.ProcessVmCounters,
uintptr(unsafe.Pointer(&vmCounters)),
unsafe.Pointer(&vmCounters),
uint32(unsafe.Sizeof(vmCounters)),
nil,
)
Expand All @@ -497,3 +508,31 @@ func (job *JobObject) QueryPrivateWorkingSet() (uint64, error) {

return jobWorkingSetSize, nil
}

// SetIOTracking enables IO tracking for processes in the job object.
// This enables use of the QueryStorageStats method.
func (job *JobObject) SetIOTracking() error {
job.handleLock.RLock()
defer job.handleLock.RUnlock()

if job.handle == 0 {
return ErrAlreadyClosed
}

return enableIOTracking(job.handle)
}

func enableIOTracking(job windows.Handle) error {
info := winapi.JOBOBJECT_IO_ATTRIBUTION_INFORMATION{
ControlFlags: winapi.JOBOBJECT_IO_ATTRIBUTION_CONTROL_ENABLE,
}
if _, err := windows.SetInformationJobObject(
job,
winapi.JobObjectIoAttribution,
uintptr(unsafe.Pointer(&info)),
uint32(unsafe.Sizeof(info)),
); err != nil {
return fmt.Errorf("failed to enable IO tracking on job object: %w", err)
}
return nil
}
78 changes: 78 additions & 0 deletions internal/jobobject/jobobject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,84 @@ func TestJobCreateAndOpen(t *testing.T) {
defer jobOpen.Close()
}

func TestJobStats(t *testing.T) {
var (
ctx = context.Background()
options = &Options{
Name: "test",
EnableIOTracking: true,
}
)
job, err := Create(ctx, options)
if err != nil {
t.Fatal(err)
}
defer job.Close()

_, err = createProcsAndAssign(1, job)
if err != nil {
t.Fatal(err)
}

_, err = job.QueryMemoryStats()
if err != nil {
t.Fatal(err)
}

_, err = job.QueryProcessorStats()
if err != nil {
t.Fatal(err)
}

_, err = job.QueryStorageStats()
if err != nil {
t.Fatal(err)
}

if err := job.Terminate(1); err != nil {
t.Fatal(err)
}
}

func TestIOTracking(t *testing.T) {
var (
ctx = context.Background()
options = &Options{
Name: "test",
}
)
job, err := Create(ctx, options)
if err != nil {
t.Fatal(err)
}
defer job.Close()

_, err = createProcsAndAssign(1, job)
if err != nil {
t.Fatal(err)
}

_, err = job.QueryStorageStats()
// Element not found is returned if IO tracking isn't enabled.
if err != nil && !errors.Is(err, windows.ERROR_NOT_FOUND) {
t.Fatal(err)
}

// Turn it on and now the call should function.
if err := job.SetIOTracking(); err != nil {
t.Fatal(err)
}

_, err = job.QueryStorageStats()
if err != nil {
t.Fatal(err)
}

if err := job.Terminate(1); err != nil {
t.Fatal(err)
}
}

func createProcsAndAssign(num int, job *JobObject) (_ []*exec.Cmd, err error) {
var procs []*exec.Cmd

Expand Down
4 changes: 2 additions & 2 deletions internal/jobobject/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (job *JobObject) getExtendedInformation() (*windows.JOBOBJECT_EXTENDED_LIMI
if err := winapi.QueryInformationJobObject(
job.handle,
windows.JobObjectExtendedLimitInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
Expand All @@ -224,7 +224,7 @@ func (job *JobObject) getCPURateControlInformation() (*winapi.JOBOBJECT_CPU_RATE
if err := winapi.QueryInformationJobObject(
job.handle,
windows.JobObjectCpuRateControlInformation,
uintptr(unsafe.Pointer(&info)),
unsafe.Pointer(&info),
uint32(unsafe.Sizeof(info)),
nil,
); err != nil {
Expand Down
Loading

0 comments on commit e6107b7

Please sign in to comment.