DDIA course project about a distributed system that fetches weather information from multiple sources, archives and visualizes it.
There are 3 major components implemented using 6 microservices.
The 3 major components are:
Multiple weather stations that feed a queueing service (Kafka) with their readings.
- Implemented in weather-station service
- We have 2 types of stations: Mock Stations, and Adapter Stations.
- Mock Stations are required to randomise its weather readings.
- Adapter Stations get results from Open-Meteo according to a latitude and longitude given at the beginning.
- Both types will have the same battery distribution(30% low - 40% medium - 30% high) and dropping percentage of 10%.
- We use a station factory design pattern to indicate which type of station we would like to build.
- We use a builder design pattern to build the message to be sent.
- Messages coming from Open Meteo are brought according to the Adapter and filter integration pattern.
- Implemented in kafka-processing service.
- There are two types of processing following router integration pattern:
Processing Type | Description |
---|---|
Dropping Messages | Processes messages by some probabilistic sampling, then throw some of them away |
Raining Areas | Processes messages and detects rain according humidity, then throw messages from rainy areas to some topic. We use Kstream and filters to do such processors |
- All these processing produce undropped messages to weather_topic And all records have humidity >= 70 go to raining topic.
- This is done in 2 stages:
Point | Description | Implementation |
---|---|---|
1) Data Collection and Archiving | Where data is consumed from Kafka topics then written in batches of 10k messages in the form of parquet files to support efficient analytical queries. | base-central-station service |
2) Parquet files Partioning | Where a Chronological job scheduled to run every 24 hours at midnight with the help of k8s and implemented in Scala using Apache Spark to partition messages in parquet files by both day and station_id. | sparky-file-partition service |
This is composed of two parts:
a) BitCask Storage
b) Elastic Search and Kibana
- Implemented in bitCask.
- We implemented the BitCask Riak LSM to maintain an updated store of each station status as discussed in the book with some notes:
-
Scheduling compaction over segment files to avoid disrupting active readers.
-
No checksums implemented to detect errors.
-
No tombstones implemented for deletions as there is no point in deleting some weather station ID entry. We just deleted Replica files on Compaction.
-
here is the structure of entry in segment files:
ENTRY: timestamp key_size value_size key value SIZE: 8 bytes 4 bytes 4 bytes key_size value_size -
Class Description BitCask Provides API for writing, reading keys, and values. ActiveSegment Handles the currently active segment file. Compaction Task Contains the following two runnable classes and uses a timer schedule to run them periodically. Compact Used to compact segments periodically. deleteIfStale Used to delete a file if it's marked as a stale file (a file that was compacted). BitCaskLock Used to ensure one writer at a time for the active file. SegmentsReader Given the hash table entry and file segment path, returns the value corresponding to this key. SegmentsWriter Given key, value, segment file path, writes the key, value into the segment. Pointer Points to an entry and contains two attributes: filePath, ByteOffset. -
- Create a new hashMap.
- Read Active file, start to end, and add key, pointer to value pairs to hashTable.
- Read hint files, from end to start, and fill hashTable with key value pairs.
-
- Loop on all replica files, read each replica file from start to end, add its key value pairs to hashMap.
- Loop on hashTable, write each key value as entry in a compacted file.
- Mark Replica File as Stale.
- Another scheduled task deleteIfStale will delete file replica file if it’s STALE file.
-
Implemented using WriteLockFile, to allow for concurrency over multiple bitCask Instances, on the same shared Storage.
- When Writer acquires lock, WriteLockFile is created in segments path.
- When another writer wants to write to active file, it checks for WriteLockFile, if it exists, it waits until file is deleted.
- When the other writer finishes, it deletes WriteLock file.
-
-
Implemented in elastic-search-and-kibana.
-
We implemented a Python script listening to kafka topic (paths_topic) where the base central station sends paths of newly-created parquet files.
-
The script then a loads a parquet file and converts it to records and connects to elasticsearch to upload records.
-
Here is the Kibana visualisations confirming Battery status distribution of some stations confirming the battery distribution of stations:
-
Here is also Kibana visualisations calculating the percentage of dropped messages from stations confirming the required percentage 10%:
This project is designed to be deployed on Kubernetes, so you should follow the following steps to run it:
-
Build a docker image from the dockerfile in each service with the following names:
Service Image Name: Version weather-station weather-station-image:latest kafka-processor kafka-processor-image:latest base-central-station base-central-station-image:latest sparky-file-partition sparky-file-partition-image:latest elastic-search-and-kibana elastic-loader-image:latest -
Use the following command to load the docker images into k8s:
minikube image load <image name>
-
Apply all k8s YAML files in k8s directory.