Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve EthereumListener #1152

Open
wants to merge 36 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
27c10b1
Replaces CompositeEthereumListener with new pub/sub model implementat…
Aug 8, 2018
8165a2c
Removes (almost) EthereumListenerAdaptor using
Aug 10, 2018
53e0173
Adds unsubscribeAfter(event) logic
Aug 10, 2018
d2fd027
Makes Publisher dependant from abstract executor. Adds additional log…
Aug 10, 2018
80fe9f8
Merge branch 'develop' into feature/1138-event-listener-improve
Aug 10, 2018
7a27d8d
Replace EthereumListener to Publisher usage
Aug 10, 2018
350c5d1
Merge branch 'develop' into feature/1138-event-listener-improve
Aug 14, 2018
c216d32
Renames all events by removing 'Event' suffix.
Aug 14, 2018
9627991
Removes experimental publisher
Aug 14, 2018
86e7ca4
Adds backward compatibility EthereumListener proxy component.
Aug 14, 2018
c54d4b9
Migrate to backeard compatibility EthereumListener using.
Aug 14, 2018
acb5e89
Deletes unused events + some events rename.
Aug 14, 2018
06c7022
Removes import wilcards.
Aug 15, 2018
f643b3c
Adds one-off condition to subscription.
Aug 15, 2018
5fcd91f
Adds subscription with life cycle abstraction.
Aug 15, 2018
7fc9119
Reverts formattings changes.
Aug 17, 2018
e1f739c
Merge branch 'develop' into feature/1138-event-listener-improve
Aug 17, 2018
2e07934
Adds concurrency testing.
Aug 20, 2018
7bf20ff
Merge branch 'develop' into feature/1138-event-listener-improve
Aug 20, 2018
e41e31b
Moves LifeCycle to Subscription.
Aug 22, 2018
ab4754d
Replaces EthereumListener.STUB anonymous class with EthereumListenerA…
Aug 22, 2018
b214302
Adds more convenient subscribing methods to Ethereum interface.
Aug 22, 2018
003ae12
Reverts RecommendedGasPriceTracker
Aug 22, 2018
7288636
Adds copyright headers.
Aug 22, 2018
442046b
Merge branch 'develop' into feature/1138-event-listener-improve
Aug 22, 2018
a06646a
Merge branch 'develop' into feature/1138-event-listener-improve
Sep 6, 2018
698b59a
Clones deprecated EthereumListener's state enums for Publisher events.
Sep 6, 2018
feeff0c
Merge branch 'develop' into feature/1138-event-listener-improve
Sep 21, 2018
d463bd1
Merge branch 'develop' into feature/1138-event-listener-improve
Sep 21, 2018
0d61180
Fixes 'this' reference error.
Sep 21, 2018
00d4609
Adds extra shortcuts to Publisher. Adds Events.Type constants. Fixes …
Oct 8, 2018
3bd0f73
Develop merge
Oct 8, 2018
67e9c6a
Fixes typo.
Oct 8, 2018
d576607
Adds minor fixes
Oct 8, 2018
f0bf19a
Adds several comments/
Oct 9, 2018
7ee5811
Adds copyright headers.
Oct 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 42 additions & 21 deletions ethereumj-core/src/main/java/org/ethereum/config/CommonConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,36 @@
*/
package org.ethereum.config;

import org.ethereum.core.EventDispatchThread;
import org.ethereum.core.Repository;
import org.ethereum.crypto.HashUtil;
import org.ethereum.datasource.*;
import org.ethereum.datasource.inmem.HashMapDB;
import org.ethereum.datasource.leveldb.LevelDbDataSource;
import org.ethereum.datasource.rocksdb.RocksDbDataSource;
import org.ethereum.db.*;
import org.ethereum.db.DbFlushManager;
import org.ethereum.db.HeaderStore;
import org.ethereum.db.PeerSource;
import org.ethereum.db.RepositoryRoot;
import org.ethereum.db.RepositoryWrapper;
import org.ethereum.db.StateSource;
import org.ethereum.listener.CompositeEthereumListener;
import org.ethereum.listener.EthereumListener;
import org.ethereum.net.eth.handler.Eth63;
import org.ethereum.publish.Publisher;
import org.ethereum.sync.FastSyncManager;
import org.ethereum.validator.*;
import org.ethereum.vm.DataWord;
import org.ethereum.vm.program.ProgramPrecompile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -76,7 +88,8 @@ BeanPostProcessor initializer() {
}


@Bean @Primary
@Bean
@Primary
public Repository repository() {
return new RepositoryWrapper();
}
Expand All @@ -86,15 +99,16 @@ public Repository defaultRepository() {
return new RepositoryRoot(stateSource(), null);
}

@Bean @Scope("prototype")
@Bean
@Scope("prototype")
public Repository repository(byte[] stateRoot) {
return new RepositoryRoot(stateSource(), stateRoot);
}

/**
* A source of nodes for state trie and all contract storage tries. <br/>
* This source provides contract code too. <br/><br/>
*
* <p>
* Picks node by 16-bytes prefix of its key. <br/>
* Within {@link NodeKeyCompositor} this source is a part of ref counting workaround<br/><br/>
*
Expand Down Expand Up @@ -126,7 +140,7 @@ public StateSource stateSource() {
@Bean
@Scope("prototype")
public Source<byte[], byte[]> cachedDbSource(String name) {
AbstractCachedSource<byte[], byte[]> writeCache = new AsyncWriteCache<byte[], byte[]>(blockchainSource(name)) {
AbstractCachedSource<byte[], byte[]> writeCache = new AsyncWriteCache<byte[], byte[]>(blockchainSource(name)) {
@Override
protected WriteCache<byte[], byte[]> createCache(Source<byte[], byte[]> source) {
WriteCache.BytesKey<byte[]> ret = new WriteCache.BytesKey<>(source, WriteCache.CacheType.SIMPLE);
Expand Down Expand Up @@ -166,7 +180,7 @@ public DbSource<byte[]> keyValueDataSource(String name, DbSettings settings) {
DbSource<byte[]> dbSource;
if ("inmem".equals(dataSource)) {
dbSource = new HashMapDB<>();
} else if ("leveldb".equals(dataSource)){
} else if ("leveldb".equals(dataSource)) {
dbSource = levelDbDataSource();
} else {
dataSource = "rocksdb";
Expand Down Expand Up @@ -221,9 +235,14 @@ private void resetDataSource(Source source) {
}
}

@Bean(name = "EthereumListener")
public CompositeEthereumListener ethereumListener() {
return new CompositeEthereumListener();
@Bean
public Publisher publisher(EventDispatchThread eventDispatchThread) {
return new Publisher(eventDispatchThread);
}

@Bean
public CompositeEthereumListener compositeEthereumListener(EventDispatchThread eventDispatchThread) {
mkalinin marked this conversation as resolved.
Show resolved Hide resolved
return new CompositeEthereumListener(eventDispatchThread);
}

@Bean
Expand Down Expand Up @@ -261,16 +280,18 @@ public byte[] serialize(byte[] object) {
DataWord addResult = ret.add(DataWord.ONE);
return addResult.getLast20Bytes();
}

public byte[] deserialize(byte[] stream) {
throw new RuntimeException("Shouldn't be called");
}
}, new Serializer<ProgramPrecompile, byte[]>() {
public byte[] serialize(ProgramPrecompile object) {
return object == null ? null : object.serialize();
}
public ProgramPrecompile deserialize(byte[] stream) {
return stream == null ? null : ProgramPrecompile.deserialize(stream);
}
public byte[] serialize(ProgramPrecompile object) {
return object == null ? null : object.serialize();
}

public ProgramPrecompile deserialize(byte[] stream) {
return stream == null ? null : ProgramPrecompile.deserialize(stream);
}
});
}

Expand All @@ -289,14 +310,14 @@ public DbFlushManager dbFlushManager() {
}

@Bean
public BlockHeaderValidator headerValidator() {
public BlockHeaderValidator headerValidator(SystemProperties systemProperties, Publisher publisher) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it'll be handy in all cases.
Let's say you need to get headerValidator out of CommonConfig then you will have to make a tricky call like commonConfig.headerValidator(commonConfig.systemProperties(), commonConfig.publisher()) instead of just commonConfig.headerValidator()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eugene-shevchenko any thoughts on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method uses only in java config, and as I mentioned earlier such bean definition hides component dependencies. With no arguments option I should invoke three factory methods in method's body to get necessary dependencies. But if you think that no-args option is better I can revert method's signature.

PS: It's sad that java config which should configure ApplicationContext only, we use like object factory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not a bad thing if we want to get rid of Spring one day.
It's more deterministic way.


List<BlockHeaderRule> rules = new ArrayList<>(asList(
new GasValueRule(),
new ExtraDataRule(systemProperties()),
EthashRule.createRegular(systemProperties(), ethereumListener()),
new GasLimitRule(systemProperties()),
new BlockHashRule(systemProperties())
new ExtraDataRule(systemProperties),
EthashRule.createRegular(systemProperties, publisher),
new GasLimitRule(systemProperties),
new BlockHashRule(systemProperties)
));

return new BlockHeaderValidator(rules);
Expand Down
53 changes: 31 additions & 22 deletions ethereumj-core/src/main/java/org/ethereum/core/BlockchainImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@
import org.ethereum.config.SystemProperties;
import org.ethereum.crypto.HashUtil;
import org.ethereum.datasource.inmem.HashMapDB;
import org.ethereum.db.*;
import org.ethereum.trie.Trie;
import org.ethereum.trie.TrieImpl;
import org.ethereum.db.BlockStore;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.db.DbFlushManager;
import org.ethereum.db.HeaderStore;
import org.ethereum.db.IndexedBlockStore;
import org.ethereum.db.PruneManager;
import org.ethereum.db.StateSource;
import org.ethereum.db.TransactionStore;
import org.ethereum.listener.EthereumListener;
import org.ethereum.listener.EthereumListenerAdapter;
import org.ethereum.manager.AdminInfo;
import org.ethereum.sync.SyncManager;
import org.ethereum.util.*;
import org.ethereum.trie.Trie;
import org.ethereum.trie.TrieImpl;
import org.ethereum.util.AdvancedDeviceUtils;
import org.ethereum.util.ByteUtil;
import org.ethereum.util.FastByteComparisons;
import org.ethereum.util.RLP;
import org.ethereum.validator.DependentBlockHeaderRule;
import org.ethereum.validator.ParentBlockHeaderValidator;
import org.ethereum.vm.program.invoke.ProgramInvokeFactory;
Expand Down Expand Up @@ -65,7 +74,11 @@
import static java.math.BigInteger.ZERO;
import static java.util.Collections.emptyList;
import static org.ethereum.core.Denomination.SZABO;
import static org.ethereum.core.ImportResult.*;
import static org.ethereum.core.ImportResult.EXIST;
import static org.ethereum.core.ImportResult.IMPORTED_BEST;
import static org.ethereum.core.ImportResult.IMPORTED_NOT_BEST;
import static org.ethereum.core.ImportResult.INVALID_BLOCK;
import static org.ethereum.core.ImportResult.NO_PARENT;
import static org.ethereum.crypto.HashUtil.sha3;
import static org.ethereum.util.ByteUtil.toHexString;

Expand Down Expand Up @@ -110,7 +123,8 @@ public class BlockchainImpl implements Blockchain, org.ethereum.facade.Blockchai
private static final int MAGIC_REWARD_OFFSET = 8;
public static final byte[] EMPTY_LIST_HASH = sha3(RLP.encodeList(new byte[0]));

@Autowired @Qualifier("defaultRepository")
@Autowired
@Qualifier("defaultRepository")
private Repository repository;

@Autowired
Expand Down Expand Up @@ -187,11 +201,11 @@ public BlockchainImpl(final SystemProperties config) {
}

//todo: autowire over constructor
public BlockchainImpl(final BlockStore blockStore, final Repository repository) {
public BlockchainImpl(final BlockStore blockStore, final Repository repository, EthereumListener listener) {
this.blockStore = blockStore;
this.repository = repository;
this.adminInfo = new AdminInfo();
this.listener = new EthereumListenerAdapter();
this.listener = listener;
this.parentHeaderValidator = null;
this.transactionStore = new TransactionStore(new HashMapDB());
this.eventDispatchThread = EventDispatchThread.getDefault();
Expand All @@ -209,11 +223,6 @@ public BlockchainImpl withAdminInfo(AdminInfo adminInfo) {
return this;
}

public BlockchainImpl withEthereumListener(EthereumListener listener) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Listener doesn't look the thing which is required for running BlockchainImpl. Also old constructor could be used by somebody. I'd prefer to keep both old constructor (and init listener there with EthereumListener.EMPTY) and withEthereumListener method.

this.listener = listener;
return this;
}

public BlockchainImpl withSyncManager(SyncManager syncManager) {
this.syncManager = syncManager;
return this;
Expand Down Expand Up @@ -493,7 +502,7 @@ public synchronized Block createNewBlock(Block parent, List<Transaction> txs, Li
new byte[0], // nonce (to mine)
new byte[0], // receiptsRoot - computed after running all transactions
calcTxTrie(txs), // TransactionsRoot - computed after running all transactions
new byte[] {0}, // stateRoot - computed after running all transactions
new byte[]{0}, // stateRoot - computed after running all transactions
txs,
null); // uncle list

Expand Down Expand Up @@ -825,7 +834,7 @@ public static Set<ByteArrayWrapper> getAncestors(BlockStore blockStore, Block te
if (!isParentBlock) {
it = blockStore.getBlockByHash(it.getParentHash());
}
while(it != null && it.getNumber() >= limitNum) {
while (it != null && it.getNumber() >= limitNum) {
ret.add(new ByteArrayWrapper(it.getHash()));
it = blockStore.getBlockByHash(it.getParentHash());
}
Expand All @@ -839,7 +848,7 @@ public Set<ByteArrayWrapper> getUsedUncles(BlockStore blockStore, Block testedBl
if (!isParentBlock) {
it = blockStore.getBlockByHash(it.getParentHash());
}
while(it.getNumber() > limitNum) {
while (it.getNumber() > limitNum) {
for (BlockHeader uncle : it.getUncleList()) {
ret.add(new ByteArrayWrapper(uncle.getHash()));
}
Expand Down Expand Up @@ -953,7 +962,7 @@ private Map<byte[], BigInteger> addReward(Repository track, Block block, List<Tr
.multiply(BigInteger.valueOf(MAGIC_REWARD_OFFSET + uncle.getNumber() - block.getNumber()))
.divide(BigInteger.valueOf(MAGIC_REWARD_OFFSET));

track.addBalance(uncle.getCoinbase(),uncleReward);
track.addBalance(uncle.getCoinbase(), uncleReward);
BigInteger existingUncleReward = rewards.get(uncle.getCoinbase());
if (existingUncleReward == null) {
rewards.put(uncle.getCoinbase(), uncleReward);
Expand Down Expand Up @@ -1096,7 +1105,7 @@ private void recordBlock(Block block) {

public void updateBlockTotDifficulties(long startFrom) {
// no synchronization here not to lock instance for long period
while(true) {
while (true) {
synchronized (this) {
((IndexedBlockStore) blockStore).updateTotDifficulties(startFrom);

Expand All @@ -1120,7 +1129,7 @@ public void updateBlockTotDifficulties(long startFrom) {
}
}

if (totalDifficulty.compareTo(maxTD) < 0) {
if (totalDifficulty.compareTo(maxTD) < 0) {
blockStore.reBranch(bestStoredBlock);
bestBlock = bestStoredBlock;
totalDifficulty = maxTD;
Expand Down Expand Up @@ -1217,7 +1226,7 @@ public Iterator<BlockHeader> getIteratorOfHeadersStartFrom(BlockIdentifier ident
* Searches block in blockStore, if it's not found there
* and headerStore is defined, searches blockHeader in it.
* @param number block number
* @return Block header
* @return Block header
*/
private BlockHeader findHeaderByNumber(long number) {
Block block = blockStore.getChainBlockByNumber(number);
Expand Down Expand Up @@ -1381,7 +1390,7 @@ public byte[] next() {
}

private class State {
// Repository savedRepo = repository;
// Repository savedRepo = repository;
byte[] root = repository.getRoot();
Block savedBest = bestBlock;
BigInteger savedTD = totalDifficulty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* Created by Anton Nashatyrev on 29.12.2015.
*/
@Component
public class EventDispatchThread {
public class EventDispatchThread implements Executor {
private static final Logger logger = LoggerFactory.getLogger("blockchain");
private static EventDispatchThread eventDispatchThread;

Expand Down Expand Up @@ -67,6 +67,11 @@ public void invokeLater(Runnable r) {
return eventDispatchThread;
}

@Override
public void execute(Runnable command) {
invokeLater(command);
}

public void invokeLater(final Runnable r) {
if (executor.isShutdown()) return;
if (counter++ % 1000 == 0) logStatus();
Expand Down
Loading