Angular Observables Tutorial

In this Angular tutorial we learn about Observables and Reactive programming with RxJS to manage asynchronous data.

We learn how to create Observables with constructors or operators and the differences between them.

What are Observables

An observable is a technique to handle sharing data. It’s considered the better version of a promise and is used extensively throughout Angular.

Where a promise can only return a single value, an observable can return a stream of values. A promise cannot be cancelled, but an observable can be.

Observables isn’t a feature specific to Angular, but a new standard for managing asynchronous data that’s included in the ES7 release.

Observables use the publish and subscribe model. It will convert a stream of data into an observable stream of data. It can then emit that data as well as signals to show any errors or the completion of the stream.

On its own an observable is not very useful, something needs to consume the data that the observable emits. These are called observers.

An observable (publisher) method will only execute and emit data once an observer (subscriber) subscribes to it. The communication between the observable and the observer is done using three callbacks.

  • next()
  • error()
  • complete()

Reactive Programming and RxJS

Reactive programming is programming with asynchronous data streams. It’s all about creating the stream, emitting value, error or complete signals and manipulating data streams.

RxJS (Reactive Extensions Library for JavaScript) is a javascript library, that allows us to work with asynchronous data streams easily. Angular uses RxJS heavily to implement reactive programming.

Following are some examples of where reactive programming is used in Angular:

  • Value and State changes in Forms.
  • The Router and Forms modules use observables to listen for, and respond to, user input events.
  • Custom events can send observable output from a child to a parent.
  • React to HTTP requests.
  • The HTTP module uses observables to handle AJAX requests and responses.

RxJS has two main players.

  • Observable
  • Observer (Subscriber)

Lesson project setup

For this lesson there is no initial setup other than having an app.

To keep things simple, we will be working directly from inside the main ‘app.component’ component and will create whatever else we need on a step-by-step basis.

How to create an Observable with the constructor

An observable will start to emit its data as soon as an observer subscribes to it.

The observable will invoke the next() callback when a value arrives in the stream, passing this value as the argument to the callback.

If an error occurs, the error() callback is invoked. When the stream completes, the complete() callback is invoked.

Note that all three of these callbacks are optional.

Now that we know how it works, let’s create a simple observable as demonstration. We will use the main ‘app’ component.

1. RxJS comes bundled with Angular when we install it, but we still have to include it in the component where we want to use it.

Example: app.component.ts
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {}

2. The simplest way to create an observable in Angular is to use the Observable constructor.

As an argument, the constructor takes the function we want to execute when an observer subscribes to it. This can be an arrow function.

Syntax: arrow function
object_name = new Observable((observer) => {
  // Logic
})

Or it can be a reference to a standalone function.

Syntax: standard function
object_name = new Observable(function_name);

function function_name(observer) {
  // Logic
}

In both cases we use observer as the object to allow us to use the callback methods with dot notation.

Example: arrow function
object_name = new Observable((observer) => {
  observer.next();
})
Example: standard function
object_name = new Observable(function_name);

function function_name(observer) {
  observer.next();
}

We will use arrow functions for the demonstration.

Example: app.component.ts
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  myObservable = new Observable((observer) => {
    console.log('Starting observable');

    observer.next('1');
    observer.next('2');
    observer.next('3');
    observer.next('4');
    observer.next('5');
  });
}

The example above creates an observable stream of numbers from 1 to 5.

‘myObservable’ is now declared as an observable, but it hasn’t been instantiated yet. To make it emit those values, we need to subscribe to it.

3. To subscribe to an observable, we call the subscribe() method on its object. Typically this is done in the ngOnInit() method.

Syntax:
ngOnInit() {
  this.myObservable.subscribe();
}

As mentioned before, we can also use the optional callback methods.

Let’s log the number stream from the observable to the browser console with another arrow function.

Example: app.component.ts
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  myObservable = new Observable((observer) => {
    console.log('Starting observable');

    observer.next('1');
    observer.next('2');
    observer.next('3');
    observer.next('4');
    observer.next('5');
  });

  ngOnInit() {
    this.myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

The observable sends the stream of data as soon as the observer subscribes to it. The observer then uses the data, printing it to the console.

If we open the browser’s Developer Tools and go to the Console pane, we can see that the numbers printed each on a separate line.

We can also add a timeout to act as a delay between the values in the data stream.

Example: app.component.ts
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  myObservable = new Observable((observer) => {
    console.log('Starting observable');

    setTimeout(() => { observer.next('1') }, 1000);
    setTimeout(() => { observer.next('2') }, 2000);
    setTimeout(() => { observer.next('3') }, 3000);
    setTimeout(() => { observer.next('4') }, 4000);
    setTimeout(() => { observer.next('5') }, 5000);
  });

  ngOnInit() {
    this.myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

4. As mentioned before, an observable can output an error with the error() callback.

The observable will stop the data stream after emitting the error signal.

Example: app.component.ts
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  myObservable = new Observable((observer) => {
    console.log('Starting observable');

    setTimeout(() => { observer.next('1') }, 1000);
    setTimeout(() => { observer.next('2') }, 2000);
    setTimeout(() => { observer.error('Error') }, 2500);
    setTimeout(() => { observer.next('3') }, 3000);
    setTimeout(() => { observer.next('4') }, 4000);
    setTimeout(() => { observer.next('5') }, 5000);
  });

  ngOnInit() {
    this.myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

The example above will stop emitting the data stream when it reaches the error.

5. The same is true for the complete() callback.

Example: app.component.ts
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  myObservable = new Observable((observer) => {
    console.log('Starting observable');

    setTimeout(() => { observer.next('1') }, 1000);
    setTimeout(() => { observer.next('2') }, 2000);
    setTimeout(() => { observer.complete() }, 2500);
    setTimeout(() => { observer.next('3') }, 3000);
    setTimeout(() => { observer.next('4') }, 4000);
    setTimeout(() => { observer.next('5') }, 5000);
  });

  ngOnInit() {
    this.myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

How to create an Observable with the create() method

Angular gives us a number of methods to create observables, the easiest of these is the create() method.

The create() method simply calls the constructor behind the scenes so it looks almost the same.

Example: app.component.ts
import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  ngOnInit() {

    const myObservable = Observable.create((observer: any) => {
      console.log('Starting observable');

      setTimeout(() => { observer.next('1') }, 1000);
      setTimeout(() => { observer.next('2') }, 2000);
      setTimeout(() => { observer.next('3') }, 3000);
      setTimeout(() => { observer.next('4') }, 4000);
      setTimeout(() => { observer.next('5') }, 5000);
    });

    myObservable.subscribe(
      (value: any) => { console.log(value) }
    )
  }
}

Notice that we specify a type in the example above. Typescript has a signature issue with the create() method and it’s recommended to explicitly specify the type.

How to create an Observable with the of operator

The of operator creates an observable from the arguments that we pass to it.

We can pass just about anything to the of operator. Each argument will be emitted separately and in the same sequence as the input.

It doesn’t belong to the Observable object so we need to import it separately.

Example:
import { of } from 'rxjs';

As an example, let’s emit two strings and print it in the browser’s console.

Example: app.component.ts
import { Component } from '@angular/core';
import { of } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  ngOnInit() {

    const myObservable = of('Hello', 'there');

    myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

Each argument is printed separately.

As mentioned before, we can pass almost anything as an argument. Arrays, strings, any sequence.

Example: app.component.ts
import { Component } from '@angular/core';
import { of } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  ngOnInit() {

    const fullName = 'John Doe';
    const numArray = [1,2,3];

    const myObservable = of(
      'Hello there',
      fullName,
      ['a', 'b', 'c'],
      numArray
    );

    myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

How to create an Observable with the from operator

The from operator will take anything that can be iterated and converts it into an observable. Unlike the of operator, it will only accept a single argument.

The from operator doesn’t belong to the Observable object so we need to import it separately.

Example:
import { from } from 'rxjs';

As an example, let’s consider a string. Because a string is technically an array of characters, it can be iterated and is a valid argument.

Example: app.component.ts
import { Component } from '@angular/core';
import { from } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  ngOnInit() {

    const myObservable = from('Hello there');

    myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

The from operator will iterate of each character in the string and emit them separately.

Output: console
H
e
l
l
o

t
h
e
r
e

We can also use the from operator to convert a promise into an observable.

Example: app.component.ts
import { Component } from '@angular/core';
import { from } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})

export class AppComponent {

  ngOnInit() {

    const myObservable = from(
      new Promise(resolve => resolve('Hello there'))
    );

    myObservable.subscribe(
      value => { console.log(value) }
    )
  }
}

Operators: Of vs From

Let’s see the differences between the two operators in the following table.

offrom
Accepts multiple argumentsAccepts only one argument
Emits each argument “as-is”Iterates over the values of its argument

So when do we use which?

  • The of operator is typically used when we need multiple separate pieces of data, like strings and numbers.
  • The from operator is typically used in situations where we need to break up lists, like an array of employee details.