Skip to content

Commit

Permalink
solver: recursively add merge source jobs to target and ancestors
Browse files Browse the repository at this point in the history
Signed-off-by: Erik Sipsma <[email protected]>
  • Loading branch information
sipsma committed May 7, 2024
1 parent 447857f commit 63352b0
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 18 deletions.
51 changes: 46 additions & 5 deletions solver/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,7 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) {
targetEdge.takeOwnership(e)

if targetState != nil {
targetState.mu.Lock()
for j := range s.jobs {
targetState.jobs[j] = struct{}{}
}
targetState.mu.Unlock()
targetState.addJobs(s, map[*state]struct{}{})

if _, ok := targetState.allPw[s.mpw]; !ok {
targetState.mpw.Add(s.mpw)
Expand All @@ -189,6 +185,51 @@ func (s *state) setEdge(index Index, targetEdge *edge, targetState *state) {
}
}

// addJobs recursively adds jobs to state and all its ancestors. currently
// only used during edge merges to add jobs from the source of the merge to the
// target and its ancestors.
// requires that Solver.mu is read-locked and srcState.mu is locked
func (s *state) addJobs(srcState *state, memo map[*state]struct{}) {
if _, ok := memo[s]; ok {
return
}
memo[s] = struct{}{}

s.mu.Lock()
defer s.mu.Unlock()

for j := range srcState.jobs {
s.jobs[j] = struct{}{}
}

for _, inputEdge := range s.vtx.Inputs() {
inputState, ok := s.solver.actives[inputEdge.Vertex.Digest()]
if !ok {
bklog.G(context.TODO()).
WithField("vertex_digest", inputEdge.Vertex.Digest()).
Error("input vertex not found during addJobs")
continue
}
inputState.addJobs(srcState, memo)

// tricky case: if the inputState's edge was *already* merged we should
// also add jobs to the merged edge's state
mergedInputEdge := inputState.getEdge(inputEdge.Index)
if mergedInputEdge == nil || mergedInputEdge.edge.Vertex.Digest() == inputEdge.Vertex.Digest() {
// not merged
continue
}
mergedInputState, ok := s.solver.actives[mergedInputEdge.edge.Vertex.Digest()]
if !ok {
bklog.G(context.TODO()).
WithField("vertex_digest", mergedInputEdge.edge.Vertex.Digest()).
Error("merged input vertex not found during addJobs")
continue
}
mergedInputState.addJobs(srcState, memo)
}
}

func (s *state) combinedCacheManager() CacheManager {
s.mu.Lock()
cms := make([]CacheManager, 0, len(s.cache)+1)
Expand Down
78 changes: 65 additions & 13 deletions solver/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3457,18 +3457,19 @@ func TestStaleEdgeMerge(t *testing.T) {
})
defer s.Close()

depV0 := vtxConst(1, vtxOpt{name: "depV0"})
depV1 := vtxConst(1, vtxOpt{name: "depV1"})
depV2 := vtxConst(1, vtxOpt{name: "depV2"})

// These should all end up edge merged
v0 := vtxAdd(2, vtxOpt{name: "v0", inputs: []Edge{
{Vertex: vtxConst(3, vtxOpt{})},
{Vertex: vtxConst(4, vtxOpt{})},
{Vertex: depV0},
}})
v1 := vtxAdd(2, vtxOpt{name: "v1", inputs: []Edge{
{Vertex: vtxConst(3, vtxOpt{})},
{Vertex: vtxConst(4, vtxOpt{})},
{Vertex: depV1},
}})
v2 := vtxAdd(2, vtxOpt{name: "v2", inputs: []Edge{
{Vertex: vtxConst(3, vtxOpt{})},
{Vertex: vtxConst(4, vtxOpt{})},
{Vertex: depV2},
}})

j0, err := s.NewJob("job0")
Expand All @@ -3478,6 +3479,11 @@ func TestStaleEdgeMerge(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j0)

// this edge should be merged with the one from j0
j1, err := s.NewJob("job1")
require.NoError(t, err)
Expand All @@ -3486,14 +3492,37 @@ func TestStaleEdgeMerge(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j0)
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)

require.Contains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives[v1.Digest()].jobs, j0)
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.Contains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives[depV1.Digest()].jobs, j0)
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)

// discard j0, verify that v0 is still active and it's state contains j1 since j1's
// edge was merged to v0's state
require.NoError(t, j0.Discard())

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives[depV0.Digest()].jobs, j0)
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)

require.Contains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives[v1.Digest()].jobs, j0)
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.Contains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives[depV1.Digest()].jobs, j0)
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)

// verify another job can still merge
j2, err := s.NewJob("job2")
Expand All @@ -3504,29 +3533,52 @@ func TestStaleEdgeMerge(t *testing.T) {
require.NotNil(t, res)

require.Contains(t, s.actives, v0.Digest())
require.Contains(t, s.actives, v1.Digest())
require.Contains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives[v0.Digest()].jobs, j0)
require.Contains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives[v0.Digest()].jobs, j2)
require.Contains(t, s.actives, depV0.Digest())
require.Contains(t, s.actives[depV0.Digest()].jobs, j1)
require.Contains(t, s.actives[depV0.Digest()].jobs, j2)

require.Contains(t, s.actives, v1.Digest())
require.Contains(t, s.actives[v1.Digest()].jobs, j1)
require.NotContains(t, s.actives[v1.Digest()].jobs, j2)
require.Contains(t, s.actives, depV1.Digest())
require.Contains(t, s.actives[depV1.Digest()].jobs, j1)
require.NotContains(t, s.actives[depV1.Digest()].jobs, j2)

require.Contains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives[v2.Digest()].jobs, j1)
require.Contains(t, s.actives[v2.Digest()].jobs, j2)
require.Contains(t, s.actives, depV2.Digest())
require.NotContains(t, s.actives[depV2.Digest()].jobs, j1)
require.Contains(t, s.actives[depV2.Digest()].jobs, j2)

// discard j1, verify only referenced edges still exist
require.NoError(t, j1.Discard())

require.Contains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives, v1.Digest())
require.Contains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives[v0.Digest()].jobs, j0)
require.NotContains(t, s.actives[v0.Digest()].jobs, j1)
require.Contains(t, s.actives[v0.Digest()].jobs, j2)
require.Contains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives[depV0.Digest()].jobs, j1)
require.Contains(t, s.actives[depV0.Digest()].jobs, j2)

require.NotContains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives, depV1.Digest())

require.Contains(t, s.actives, v2.Digest())
require.Contains(t, s.actives[v2.Digest()].jobs, j2)
require.Contains(t, s.actives, depV2.Digest())
require.Contains(t, s.actives[depV2.Digest()].jobs, j2)

// discard the last job and verify everything was removed now
require.NoError(t, j2.Discard())
require.NotContains(t, s.actives, v0.Digest())
require.NotContains(t, s.actives, v1.Digest())
require.NotContains(t, s.actives, v2.Digest())
require.NotContains(t, s.actives, depV0.Digest())
require.NotContains(t, s.actives, depV1.Digest())
require.NotContains(t, s.actives, depV2.Digest())
}

func generateSubGraph(nodes int) (Edge, int) {
Expand Down

0 comments on commit 63352b0

Please sign in to comment.