Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opstream teardowns #353

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
"paparam": "^1.4.0",
"pear-changelog": "^1.0.1",
"pear-interface": "^1.0.0",
"pear-ipc": "^2.2.8",
"pear-ipc": "^2.3.0",
"pear-link": "^2.0.1",
"pear-updater": "^3.1.0",
"pear-updater-bootstrap": "^1.2.0",
Expand Down
25 changes: 17 additions & 8 deletions subsystems/sidecar/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const encryptionKeys = new Store('encryption-keys')
const knownNodes = new Store('dht')
const SharedState = require('../../state')
const State = require('./state')
const Freelist = require('./lib/freelist')
const { preferences } = State
const ops = {
GC: require('./ops/gc'),
Expand Down Expand Up @@ -88,6 +89,8 @@ class Sidecar extends ReadyResource {
? flags.dhtBootstrap.split(',').map(e => ({ host: e.split(':')[0], port: Number(e.split(':')[1]) }))
: flags.dhtBootstrap

this.opstreams = new Freelist()

this.bus = new Iambus()
this.version = CHECKOUT

Expand All @@ -103,7 +106,13 @@ class Sidecar extends ReadyResource {
this.ipc = new IPC({
handlers: this,
lock: PLATFORM_LOCK,
socketPath: SOCKET_PATH
socketPath: SOCKET_PATH,
onpipeline: (_, stream) => {
const id = this.opstreams.alloc(stream)
stream.once('close', () => {
this.opstreams.free(id)
})
}
})

this.ipc.on('client', (client) => {
Expand Down Expand Up @@ -491,13 +500,13 @@ class Sidecar extends ReadyResource {

shutdown (params, client) { return this.#shutdown(client) }

#teardownPipelines (client) {
// TODO: instead of client._rpc collect src and dst streams in sidecar, do push(null) on src stream, listen for close on dst stream
const streams = client._rpc._handlers.flatMap((m) => m?._streams).filter((m) => m?.destroyed === false)
return Promise.all(streams.map((stream) => new Promise((resolve) => {
stream.once('close', resolve)
stream.end()
})))
#teardownPipelines () {
return Promise.all([...this.opstreams].map((s) => {
return new Promise((resolve) => {
s.once('close', () => { resolve() })
s.push(null)
})
}))
}

closeClients () {
Expand Down
33 changes: 33 additions & 0 deletions subsystems/sidecar/lib/freelist.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module.exports = class Freelist {
alloced = []
freed = []
nextId () {
return this.freed.length === 0 ? this.alloced.length : this.freed[this.freed.length - 1]
}

alloc (item) {
const id = this.freed.length === 0 ? this.alloced.push(null) - 1 : this.freed.pop()
this.alloced[id] = item
return id
}

free (id) {
this.freed.push(id)
this.alloced[id] = null
}

from (id) {
return id < this.alloced.length ? this.alloced[id] : null
}

emptied () {
return this.freed.length === this.alloced.length
}

* [Symbol.iterator] () {
for (const item of this.alloced) {
if (item === null) continue
yield item
}
}
}
Loading