Skip to content

Commit

Permalink
Remove MultiPacketEncoder
Browse files Browse the repository at this point in the history
It needs to be rewritten from scratch anyway. See #32

Related: #32
  • Loading branch information
zapek committed Jun 12, 2022
1 parent 6aee052 commit 9323cad
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,164 +19,13 @@

package io.xeres.app.net.peer.pipeline;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.ScheduledFuture;
import io.xeres.app.net.peer.PeerAttribute;
import io.xeres.app.net.peer.packet.Packet;
import org.apache.commons.lang3.NotImplementedException;

import java.util.PriorityQueue;

import static io.xeres.app.net.peer.packet.MultiPacket.MAXIMUM_ID;

// XXX: this is a mess... rewrite it later when I'll have a better architecture. basically I don't even know if I can do that with netty properly
// something like... intercept the writabilityChanged event... don't pass it up as we can still fill in our queue (then pass it, to not gobble all memory).
// then write once we get the event again.. and so on. I think it has to be a ChannelDuplex subclass too! though I don't see where I can get the events... sigh
// or... just write() then use flush() after either 1/2 or 1/4 of a seconds or if a high priority packet comes (should the high priority one really get in front?)
// if it does have to then it's a bit more complicated
public class MultiPacketEncoder extends ChannelOutboundHandlerAdapter // XXX: must extend ChannelOutboundHandlerAdapter
public class MultiPacketEncoder extends ChannelOutboundHandlerAdapter
{
private static final boolean USE_PACKET_SLICING = false; // XXX: set that to TRUE to get the "proper" logic...
private static final boolean USE_PACKET_GROUPING = false;
private static final boolean USE_QOS = false;

private final PriorityQueue<Packet> queue = new PriorityQueue<>();
private ScheduledFuture<Void> flusher;

private int written;
private final int hiWater = Packet.OPTIMAL_PACKET_SIZE;
private int packetId;

// XXX: we can override read() too!

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
{
var packet = (Packet) msg;

if (USE_PACKET_SLICING && Boolean.TRUE.equals(ctx.channel().attr(PeerAttribute.MULTI_PACKET).get()))
{
if (USE_PACKET_GROUPING)
{
if (getPacketSizeWithHeader(packet) + written > hiWater)
{
// Slice and send (note that original RS doesn't do it)
}
}
// XXX: if we add to the queue, we need to send a new promise I think... so it knows it was "written" and can read more
// XXX: well, same problem then! it needs ctx.write(Unpooled.EMPTY_BUFFER, promise)

enqueue(packet);
if (packet.isRealtimePriority())
{
flusher.cancel(false);
//writePacket(ctx);
ctx.flush();
}
else
{
//if (false)
//{
// XXX: this doesn't work because MessageToByteEncoder does send an empty buffer if we didn't write anything. do we have to subclass ChannelOutboundHandlerAdapter then? this seems complicated... there must be an easier way
// XXX: actually! maybe the "easier" way would be to queue before sending...

// XXX: otherwise! just issue write() calls here and only flush() with the executor. it's the flush() which executes the send() syscall
// so just fill with write() until we reach 512? and either flush directly or with the executor?
// XXX: !!! there's a FlushConsolidationHandler! Check it! -> well, no. this is not what we want
//ByteBuf slice = out.retainedSlice();
//flusher = ctx.executor().schedule(() -> writePacket(ctx, slice), 250, TimeUnit.MILLISECONDS); // XXX: will have to tweak the delays... also depends on the packet priority I guess. MAKE SURE THIS RUNS ON THE SAME THREAD as encode()!!!
//}
//else
//{
//writePacket(ctx, out);
//}
}
}
else
{
// Send the old way
//ctx.writeAndFlush(packet.getData(), promise); // XXX: is this correct?! maybe not...
}
}

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
{
flusher.cancel(false);
// XXX: do we have to wait for writeFuture()?!
super.close(ctx, promise);
}

private void enqueue(Packet packet)
{
var remainingSize = packet.getSize();

if (remainingSize >= Packet.OPTIMAL_PACKET_SIZE)
{
var offset = 0;
var sequence = 0;

while (remainingSize > 0)
{
var copySize = Math.min(remainingSize, Packet.OPTIMAL_PACKET_SIZE);

//RsPacket slice = new RsPacket(packet.getPriority());
var newData = new byte[copySize];
//System.arraycopy(packet.getData(), offset, newData, 0, copySize);
//slice.setData(newData);
//slice.setId(packetId);
//slice.setSequence(sequence);
if (offset == 0)
{
//slice.setStart(true);
}

sequence++;
offset += copySize;
remainingSize -= copySize;
if (remainingSize == 0)
{
//slice.setEnd(true);
}
//queue.add(slice);
}
incrementId();
}
else
{
queue.add(packet); // XXX: how do we ensure this won't grow too much? (ie. peer stopped responding...). we probably have to send some "read" event down the line
}
}

private void writePacket(ChannelHandlerContext ctx, Packet packet, ByteBuf out)
{
var sizeWithHeader = getPacketSizeWithHeader(packet);

out.ensureWritable(sizeWithHeader); // XXX: IndexOutOfBoundsException -> close the connection if it's the case. maybe it already does it

out.writeByte(Packet.SLICE_PROTOCOL_VERSION_ID_01);
//out.writeByte(packet.getFlags());
//out.writeInt(packet.getId());
out.writeShort(packet.getSize());
//out.writeBytes(packet.getData());

ctx.write(out);
written += sizeWithHeader;
}

private int getPacketSizeWithHeader(Packet packet)
{
return packet.getSize() + 8;
}

private void incrementId()
public MultiPacketEncoder()
{
packetId++;
if (packetId >= MAXIMUM_ID)
{
packetId = 0;
}
throw new NotImplementedException("MultiPacketEncoder is not available yet, see #32");
}
}
1 change: 0 additions & 1 deletion app/src/main/java/io/xeres/app/xrs/common/Signature.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Arrays;
import java.util.Objects;

// XXX: maybe should be in crypto? not sure... It's related to identities but I don't like the structure
public record Signature(GxsId gxsId, byte[] data)
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,11 @@ private enum Invitation

private ScheduledExecutorService executorService;

public ChatRsService(Environment environment, PeerConnectionManager peerConnectionManager, LocationService locationService, PeerConnectionManager peerConnectionManager1, IdentityService identityService, ChatRoomService chatRoomService, DatabaseSessionManager databaseSessionManager, IdentityManager identityManager)
public ChatRsService(Environment environment, PeerConnectionManager peerConnectionManager, LocationService locationService, IdentityService identityService, ChatRoomService chatRoomService, DatabaseSessionManager databaseSessionManager, IdentityManager identityManager)
{
super(environment);
this.locationService = locationService;
this.peerConnectionManager = peerConnectionManager1;
this.peerConnectionManager = peerConnectionManager;
this.identityService = identityService;
this.chatRoomService = chatRoomService;
this.databaseSessionManager = databaseSessionManager;
Expand Down
8 changes: 7 additions & 1 deletion app/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ xrs.ui.enabled=true
xrs.ui.address=localhost
xrs.ui.port=1066
#xrs.ui.ssl.enabled=false XXX: add support for that

## Database
# Cache size (in KB)
xrs.db.cache-size=1024

# Actuator
info.java.vm.vendor=${java.vm.vendor}
info.java.version=${java.version}
Expand All @@ -36,11 +38,15 @@ management.endpoint.shutdown.enabled=true
management.endpoints.web.exposure.include=info,health,env,logfile,shutdown
management.endpoints.web.base-path=/api/v1/actuator
springdoc.show-actuator=true

## Network
# Use the new packet slicing system (currently broken)
# Use the new packet slicing system (not implemented yet, receiving always works)
xrs.network.packet-slicing=false
# Use the new packet grouping mechanism (not implemented yet, receiving always works)
xrs.network.packet-grouping=false
# Use the DHT (not finished yet)
xrs.network.dht=false

# RsServices
xrs.service.rtt.enabled=true
xrs.service.sliceprobe.enabled=true
Expand Down
4 changes: 0 additions & 4 deletions ui/src/main/resources/view/javafx.css
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@
/*noinspection CssUnusedSymbol*/
.oldchatlist .list-cell {
-fx-control-inner-background-alt: -fx-control-inner-background;
}

/*noinspection CssUnusedSymbol*/
.oldchatlist .list-cell {
-fx-padding: 0px 0.25em 0px 0.25em
}

Expand Down

0 comments on commit 9323cad

Please sign in to comment.