Skip to content

Latest commit

 

History

History
135 lines (109 loc) · 8.43 KB

README.md

File metadata and controls

135 lines (109 loc) · 8.43 KB

Weather-Stations-Monitoring

DDIA course project about a distributed system that fetches weather information from multiple sources, archives and visualizes it.

image

Agenda

System Architecture

There are 3 major components implemented using 6 microservices.
The 3 major components are:

  1. Data Acquisition.
  2. Data Processing & Archiving.
  3. Data Indexing.

Data Acquisition

Multiple weather stations that feed a queueing service (Kafka) with their readings.

A) Weather Stations

  • Implemented in weather-station service
  • We have 2 types of stations: Mock Stations, and Adapter Stations.
  • Mock Stations are required to randomise its weather readings.
  • Adapter Stations get results from Open-Meteo according to a latitude and longitude given at the beginning.
  • Both types will have the same battery distribution(30% low - 40% medium - 30% high) and dropping percentage of 10%.
  • We use a station factory design pattern to indicate which type of station we would like to build.
  • We use a builder design pattern to build the message to be sent.
  • Messages coming from Open Meteo are brought according to the Adapter and filter integration pattern. image

B) Kafka Processors

Processing Type Description
Dropping Messages Processes messages by some probabilistic sampling, then throw some of them away
Raining Areas Processes messages and detects rain according humidity, then throw messages from rainy areas to some topic. We use Kstream and filters to do such processors
  • All these processing produce undropped messages to weather_topic And all records have humidity >= 70 go to raining topic.

Data Processing And Archiving

  • This is done in 2 stages:
Point Description Implementation
1) Data Collection and Archiving Where data is consumed from Kafka topics then written in batches of 10k messages in the form of parquet files to support efficient analytical queries. base-central-station service
2) Parquet files Partioning Where a Chronological job scheduled to run every 24 hours at midnight with the help of k8s and implemented in Scala using Apache Spark to partition messages in parquet files by both day and station_id. sparky-file-partition service

Partitioned Parquet Files

Data Indexing

This is composed of two parts:
a) BitCask Storage
b) Elastic Search and Kibana

Bitcask Storage

  • Implemented in bitCask.
  • We implemented the BitCask Riak LSM to maintain an updated store of each station status as discussed in the book with some notes:
    • Scheduling compaction over segment files to avoid disrupting active readers.

    • No checksums implemented to detect errors.

    • No tombstones implemented for deletions as there is no point in deleting some weather station ID entry. We just deleted Replica files on Compaction.

    • here is the structure of entry in segment files:

      ENTRY: timestamp key_size value_size key value
      SIZE: 8 bytes 4 bytes 4 bytes key_size value_size
    • Classes

      Class Description
      BitCask Provides API for writing, reading keys, and values.
      ActiveSegment Handles the currently active segment file.
      Compaction Task Contains the following two runnable classes and uses a timer schedule to run them periodically.
      Compact Used to compact segments periodically.
      deleteIfStale Used to delete a file if it's marked as a stale file (a file that was compacted).
      BitCaskLock Used to ensure one writer at a time for the active file.
      SegmentsReader Given the hash table entry and file segment path, returns the value corresponding to this key.
      SegmentsWriter Given key, value, segment file path, writes the key, value into the segment.
      Pointer Points to an entry and contains two attributes: filePath, ByteOffset.
    • Crash Recovery Mechanism

      • Create a new hashMap.
      • Read Active file, start to end, and add key, pointer to value pairs to hashTable.
      • Read hint files, from end to start, and fill hashTable with key value pairs.
    • Compaction Mechanism

      • Loop on all replica files, read each replica file from start to end, add its key value pairs to hashMap.
      • Loop on hashTable, write each key value as entry in a compacted file.
      • Mark Replica File as Stale.
      • Another scheduled task deleteIfStale will delete file replica file if it’s STALE file.
    • MultiWriter concurrency Mechanism

      Implemented using WriteLockFile, to allow for concurrency over multiple bitCask Instances, on the same shared Storage.

      • When Writer acquires lock, WriteLockFile is created in segments path.
      • When another writer wants to write to active file, it checks for WriteLockFile, if it exists, it waits until file is deleted.
      • When the other writer finishes, it deletes WriteLock file.

Elastic Search and Kibana

  • Implemented in elastic-search-and-kibana.

  • We implemented a Python script listening to kafka topic (paths_topic) where the base central station sends paths of newly-created parquet files.

  • The script then a loads a parquet file and converts it to records and connects to elasticsearch to upload records.

  • Here is the Kibana visualisations confirming Battery status distribution of some stations confirming the battery distribution of stations: image

  • Here is also Kibana visualisations calculating the percentage of dropped messages from stations confirming the required percentage 10%: image

How To Run

This project is designed to be deployed on Kubernetes, so you should follow the following steps to run it:

  1. Build a docker image from the dockerfile in each service with the following names:

    Service Image Name: Version
    weather-station weather-station-image:latest
    kafka-processor kafka-processor-image:latest
    base-central-station base-central-station-image:latest
    sparky-file-partition sparky-file-partition-image:latest
    elastic-search-and-kibana elastic-loader-image:latest
  2. Use the following command to load the docker images into k8s:
    minikube image load <image name>

  3. Apply all k8s YAML files in k8s directory.