Skip to content
This repository has been archived by the owner on Mar 12, 2024. It is now read-only.

RxJs and Reactive Programming #14

Open
reboottime opened this issue Nov 12, 2023 · 5 comments
Open

RxJs and Reactive Programming #14

reboottime opened this issue Nov 12, 2023 · 5 comments

Comments

@reboottime
Copy link
Owner

reboottime commented Nov 12, 2023

Overview

Introduction

This article attempts to seize Rxjs essentials in a short time. The content covers two parts:


As RxJS declares itself as a library for reactive programming using Observables, this article starts with understanding reactive programming essentials.


Quick References

@reboottime
Copy link
Owner Author

reboottime commented Nov 12, 2023

Reactive Programming, Part I: Core Concepts

Definition and Application Scenarios

  • What is it: Reactive Programming is a programming paradigm centered around dealing with data streams and the propagation of change.
  • When to use it: It's particularly useful in scenarios where data is dynamic and changes over time, such as in applications with real-time features, asynchronous processing, or complex event handling. Here are the key aspects of Reactive Programming.

Core Concepts

  • Data Streams: In Reactive Programming, everything can be represented as a stream of data. This includes variables, user inputs, properties, caches, data structures, etc. A stream can emit multiple values over time, unlike a traditional function that returns a single value.

  • Asynchronous Data Flow: Data streams are inherently asynchronous. Reactive Programming handles data that flows over time and allows for asynchronous data handling, making it ideal for applications that deal with events, real-time updates, and asynchronous computations.

  • Reactive Systems: These systems are responsive, resilient, elastic, and message-driven. They react to changes in the input data, maintaining a dynamic environment. Reactive systems are highly responsive, providing real-time feedback based on user interactions or other input. ( I do not understand deeply, need use case and practices to understand its implication)

  • Observer Pattern: This pattern is fundamental in Reactive Programming. Observers subscribe to data streams and react to emitted values. The pattern consists of three parts:

    • the subject (stream),
    • observers (subscribers),
    • and the subscription.
  • Functional Programming Techniques: Reactive Programming often leverages functional programming concepts like

    • immutability,
    • pure functions,
    • and higher-order functions.

These principles facilitate the creation of side effect-free and scalable applications.

@reboottime
Copy link
Owner Author

reboottime commented Nov 12, 2023

Reactive Programming, Part II: Observer Pattern

The concepts and their associated code patterns discussed below were learned from Derek Banas's tutorial on the Observer Design Pattern, available at this link.

Core concepts and application of Observer Pattern

  • Core concepts

    • Subject ( publisher)
    • Observer(subscribers)
  • When to Use the Observer Pattern

    • When you need many other objects to receive an update when another object changes
    • Stock market with thousands of stocks needs to send updates to objects representing individual stocks
    • The Subject (publisher) sends many stocks to the Observers
    • The Observers (subscribers) takes the ones they want and use them
image

Sample code of Observer Pattern (using Typescript)

  • interfaces for subject and observer
interface Observer {
    notify: (data: any) => void;
}

interface Subject {
    attach: (observer: Observer) => void;
    detach: (observer: Observer) => void;
    notifyObservers: () => void;
}
  • Subject code
class ConcreteSubject implements Subject {
    private observers: Observer[] = [];

    // Any state changes that the observers should know about
    private state: any;

    public attach(observer: Observer): void {
        const isExist = this.observers.includes(observer);
        if (isExist) {
            return console.log('Subject: Observer has been attached already.');
        }

        console.log('Subject: Attached an observer.');
        this.observers.push(observer);
    }

    public detach(observer: Observer): void {
        const observerIndex = this.observers.indexOf(observer);
        if (observerIndex === -1) {
            return console.log('Subject: Nonexistent observer.');
        }

        this.observers.splice(observerIndex, 1);
        console.log('Subject: Detached an observer.');
    }

    public notifyObservers(): void {
        console.log('Subject: Notifying observers...');
        for (const observer of this.observers) {
            observer.notify(this.state);
        }
    }

    // Usually, the subscription logic is only a fraction of what a Subject can do.
    // Subjects commonly hold some important business logic, that triggers a notification
    // method whenever something important is about to happen (or after it).
    public someBusinessLogic(): void {
        console.log('\nSubject: I\'m doing something important.');
        this.state = Math.floor(Math.random() * 10);

        console.log(`Subject: My state has just changed to: ${this.state}`);
        this.notifyObservers();
    }
}
  • Observer example code
class ConcreteObserver implements Observer {
    // Receive update from subject
    public notify(data: any): void {
        console.log(`Observer: Reacted to the event with data: ${data}`);
    }
}
  • Sample code on applying both observer and subject
const subject = new ConcreteSubject();

const observer1 = new ConcreteObserver();
subject.attach(observer1);

const observer2 = new ConcreteObserver();
subject.attach(observer2);

subject.someBusinessLogic();
subject.someBusinessLogic();

subject.detach(observer2);

subject.someBusinessLogic();

@reboottime
Copy link
Owner Author

reboottime commented Nov 12, 2023

RxJs, Part I: Terminologies

Terminologies in Rxjs:

  • Observable: A source that emits data or events over time.
  • Observer: receives and acts upon the data emitted by Observables.
  • Operators: functions used to transform, filter, or manipulate data emitted by Observables via the pipe method.
  • Pipe: the mechanism that connects Observables and Operators to process emitted data in a structured sequence.

Take the burger shop (as in this tutorial) as an analogy:

  • Observable and the Observer
  • The pipe and the operators

@reboottime
Copy link
Owner Author

reboottime commented Nov 13, 2023

RxJs, Part II: Code Pattern

const { Observable } = require("rxjs");
const { pluck, map, filter } = require("rxjs/operators");

const users = {
  data: [
    {
      id: 1,
      status: "active",
      age: 14,
    },
    {
      id: 1,
      status: "inactive",
      age: 12,
    },
    {
      id: 1,
      status: "active",
      age: 42,
    },
    {
      id: 1,
      status: "inactive",
      age: 42,
    },
    {
      id: 1,
      status: "active",
      age: 13,
    },
    {
      id: 1,
      status: "inactive",
      age: 75,
    },
    {
      id: 1,
      status: "inactive",
      age: 43,
    },
    {
      id: 1,
      status: "inactive",
      age: 54,
    },
    {
      id: 1,
      status: "active",
      age: 7,
    },
    {
      id: 1,
      status: "active",
      age: 17,
    },
  ],
};

const observable = new Observable((subscriber) => {
  subscriber.next(users);
}).pipe(
  pluck("data"),
  filter((users) => users.length >= 10),
  map((users) => {
    return users.filter((user) => user.status === "active");
  }),
  map((users) => {
    return users.reduce((sum, user) => sum + user.age, 0) / users.length;
  }),
  map((average) => {
    if (average < 18) throw new Error(`Average age is too small (${average})`);
    else return average;
  }),
  map((average) => `The average age is ${average}`)
);

const observer = {
  next: (x) => console.log("Observer got a next value: " + x),
  error: (err) => console.error("Observer got an error: " + err),
  complete: () => console.log("Observer got a complete notification"),
};
const observer2 = {
  next: (x) => console.log("Observer 2 got a next value: " + x),
  error: (err) => console.error("Observer 2 got an error: " + err),
  complete: () => console.log("Observer 2 got a complete notification"),
};

observable.subscribe(observer);

observable.subscribe(observer2);

@reboottime
Copy link
Owner Author

reboottime commented Nov 13, 2023

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant