Skip to content
This repository has been archived by the owner on Jul 23, 2019. It is now read-only.

Commit

Permalink
Handle websocket connections in xray_browser's script/server
Browse files Browse the repository at this point in the history
...and remove websocket handling from xray_server, which will now only
accept TCP connections.
  • Loading branch information
Antonio Scandurra authored and Nathan Sobo committed Apr 27, 2018
1 parent 61c2c19 commit d96746d
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 402 deletions.
331 changes: 0 additions & 331 deletions Cargo.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions xray_browser/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion xray_browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"express": "^4.16.3",
"webpack": "^4.6.0",
"webpack-cli": "^2.0.15",
"webpack-dev-middleware": "^3.1.2"
"webpack-dev-middleware": "^3.1.2",
"ws": "^5.1.1"
}
}
47 changes: 46 additions & 1 deletion xray_browser/script/server
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
#!/usr/bin/env node

const assert = require("assert");
const http = require("http");
const express = require("express");
const ws = require("ws");
const { Socket } = require("net");
const path = require("path");
const webpack = require("webpack");
const webpackDev = require("webpack-dev-middleware");
express.static.mime.types["wasm"] = "application/wasm";

const PORT = 3000;
const app = express();
const server = http.createServer(app);

const compiler = webpack([
{
Expand All @@ -29,7 +34,47 @@ const compiler = webpack([
}
]);

const websocketServer = new ws.Server({ server, path: "/ws" });
websocketServer.on("connection", async ws => {
const connection = new Socket();

let incomingMessage = null;
let remainingBytes = 0;
connection.on("data", data => {
let offset = 0;
while (offset < data.length) {
if (incomingMessage) {
assert(remainingBytes !== 0, "remainingBytes should not be 0");
const copiedBytes = data.copy(
incomingMessage,
incomingMessage.length - remainingBytes,
offset,
offset + remainingBytes
);
remainingBytes -= copiedBytes;
offset += copiedBytes;
} else {
remainingBytes = data.readUInt32BE(offset);
incomingMessage = Buffer.alloc(remainingBytes);
offset += 4;
}

if (incomingMessage && remainingBytes === 0) {
ws.send(incomingMessage);
incomingMessage = null;
}
}
});

await new Promise(resolve => connection.connect(8080, resolve));
ws.on("message", message => {
const bufferLengthHeader = Buffer.alloc(4);
bufferLengthHeader.writeUInt32BE(message.length, 0);
connection.write(Buffer.concat([bufferLengthHeader, message]));
});
});

app.use(webpackDev(compiler, { publicPath: "/" }));
app.use("/", express.static(path.join(__dirname, "../static")));
// app.use("/", express.static(path.join(__dirname, "../dist")));
app.listen(PORT, () => console.log("Listening to port " + PORT));
server.listen(PORT, () => console.log("Listening to port " + PORT));
7 changes: 4 additions & 3 deletions xray_browser/src/ui.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { React, ReactDOM, App, buildViewRegistry } from "xray_ui"
import { React, ReactDOM, App, buildViewRegistry } from "xray_ui";
import XrayClient from "./client";
const $ = React.createElement;

const client = new XrayClient(new Worker("worker.js"));
const websocketURL = "ws://127.0.0.1:9999";
client.sendMessage({ type: "ConnectToWebsocket", url: websocketURL });
const websocketURL = new URL("/ws", window.location.href);
websocketURL.protocol = "ws";
client.sendMessage({ type: "ConnectToWebsocket", url: websocketURL.href });

const viewRegistry = buildViewRegistry(client);

Expand Down
2 changes: 1 addition & 1 deletion xray_browser/src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const encoder = new TextEncoder();
const decoder = new TextDecoder("utf-8");
const serverPromise = xrayPromise.then(xray => new Server(xray));

global.addEventListener("message", async (event) => {
global.addEventListener("message", async event => {
const message = event.data;
const server = await serverPromise;
server.handleMessage(message);
Expand Down
9 changes: 1 addition & 8 deletions xray_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ const USAGE: &'static str = "
Xray
Usage:
xray [--socket-path=<path>] [--headless] [--listen=<port>] [--websocket=<port>] [--connect=<address>] [<path>...]
xray [--socket-path=<path>] [--headless] [--listen=<port>] [--connect=<address>] [<path>...]
xray (-h | --help)
Options:
-h --help Show this screen.
-H --headless Start Xray in headless mode.
-l --listen=<port> Listen for TCP connections on the specified port.
-w --websocket=<port> Listen for Websocket connections on the specified port.
-c --connect=<address> Connect to the specified address.
";

Expand All @@ -37,7 +36,6 @@ enum ServerRequest {
OpenWorkspace { paths: Vec<PathBuf> },
ConnectToPeer { address: SocketAddr },
TcpListen { port: PortNumber },
WebsocketListen { port: PortNumber },
}

#[derive(Deserialize)]
Expand All @@ -52,7 +50,6 @@ struct Args {
flag_socket_path: Option<String>,
flag_headless: Option<bool>,
flag_listen: Option<PortNumber>,
flag_websocket: Option<PortNumber>,
flag_connect: Option<SocketAddr>,
arg_path: Vec<PathBuf>,
}
Expand Down Expand Up @@ -122,10 +119,6 @@ fn launch() -> Result<(), String> {
send_message(&mut socket, ServerRequest::TcpListen { port })?;
}

if let Some(port) = args.flag_websocket {
send_message(&mut socket, ServerRequest::WebsocketListen { port })?;
}

Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion xray_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,4 @@ tokio-io = "0.1"
tokio-core = "0.1"
tokio-process = "0.1"
tokio-uds = "0.1"
websocket = "0.20"
xray_core = {path = "../xray_core"}
1 change: 0 additions & 1 deletion xray_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_process;
extern crate tokio_uds;
extern crate websocket;
extern crate xray_core;

use std::env;
Expand Down
3 changes: 0 additions & 3 deletions xray_server/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ pub enum IncomingMessage {
TcpListen {
port: u16,
},
WebsocketListen {
port: u16,
},
StartWindow {
window_id: WindowId,
height: f64,
Expand Down
52 changes: 0 additions & 52 deletions xray_server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::rc::Rc;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor;
use tokio_io::codec;
use websocket;
use xray_core::app::Command;
use xray_core::{self, App, Never, WindowId};

Expand Down Expand Up @@ -167,9 +166,6 @@ impl Server {
Box::new(self.open_workspace(paths).into_future())
}
IncomingMessage::TcpListen { port } => Box::new(self.tcp_listen(port).into_future()),
IncomingMessage::WebsocketListen { port } => {
Box::new(self.websocket_listen(port).into_future())
}
IncomingMessage::ConnectToPeer { address } => self.connect_to_peer(address),
_ => Box::new(future::err(format!("Unexpected message {:?}", message))),
};
Expand Down Expand Up @@ -230,54 +226,6 @@ impl Server {
Ok(())
}

fn websocket_listen(&self, port: u16) -> Result<(), String> {
let local_addr = SocketAddr::new("127.0.0.1".parse().unwrap(), port);
let listener = websocket::async::Server::bind(local_addr, &self.reactor)
.map_err(|_| "Error binding address".to_owned())?;
let app = self.app.clone();
let reactor = self.reactor.clone();
let handle_incoming = listener
.incoming()
.map_err(|_| eprintln!("Error accepting incoming connection"))
.for_each(move |(upgrade, _)| {
let app = app.clone();
let reactor = reactor.clone();
upgrade
.accept()
.map_err(|_| eprintln!("Error during websocket handshake"))
.and_then(move |(transport, _headers)| {
let (tx, rx) = transport.split();
let rx = rx.filter_map(|message| match message {
websocket::message::OwnedMessage::Binary(bytes) => {
Some(Bytes::from(bytes))
}
message @ _ => {
eprintln!("Received unknown message: {:?}", message);
None
}
}).map_err(|error| match error {
websocket::result::WebSocketError::IoError(error) => error,
error @ _ => io::Error::new(io::ErrorKind::Other, error.description()),
});
let connection = App::connect_to_client(app.clone(), rx);
reactor.spawn(
tx.send_all(
connection
.map(|message| {
// TODO: consider going back to Vec<u8> to represent
// messages on the way out to avoid this allocation.
websocket::message::OwnedMessage::Binary(message.to_vec())
})
.map_err(|_| -> io::Error { unreachable!() }),
).then(|_| Ok(())),
);
Ok(())
})
});
self.reactor.spawn(handle_incoming);
Ok(())
}

fn connect_to_peer(&self, address: SocketAddr) -> Box<Future<Item = (), Error = String>> {
let reactor = self.reactor.clone();
let app = self.app.clone();
Expand Down

0 comments on commit d96746d

Please sign in to comment.