Seja Reativo com Reactive Extensions

28 mar

Fala Galera,

Neste post vou falar sobre aplicações reativas e quando pensamos nesse assunto nos perguntamos por que desenvolver uma aplicação reativa?

Por que usuários esperam dados reais em tempo real, eles esperam que a confirmação de uma compra online seja confirmada na hora, eles esperam que os jogos online sejam responsivos e respondam a determinadas ações.

A programação reativa baseia-se em fluxos de dados e propagação de mudanças com o modelo de execução de uma linguagem de programação repercutindo automaticamente através do fluxo de dados.

Para se construir aplicações reativas, as aplicações reativas devem ser capazes de:

  • Reagir a eventos – a natureza de ser orientada a eventos e reagir conforme a propagação de um evento
  • Reagir a cargas – Deve ser escalável
  • Reagir a falhas – Ser resiliente a falhas em todos os níveis
  • Reagir aos usuários – Combinar todas as características acima citada para proporcionar uma experiência de usuário interativa.

E para atender a demandas de aplicações reativas eis que surgi o Rx .NET (Reactive Extensions). 

O que é Rx .NET (Reactive Extensions)

O Reactive Extensions (Rx) é uma biblioteca open source para nós desenvolvedores compor programas assíncronos baseados em eventos usando LINQ e o padrão de sequência Observable

Com o Rx podemos representar fluxos de dados assíncronos com Observable, fazer consultas em fluxos de dados assíncronos usando operadores LINQ e também podemos parametrizar o fluxo concorrente de dados usando Schedulers.

Como funciona

Usando o Rx, nós podemos representar múltiplos fluxos de dados assíncronos, esses fluxos de dados podem ser por exemplo cotações de ações, twitters com determinadas hashtag, web service request entre outros e subscrever os eventos do fluxo de dados usando a interface IObserver<T>. A interface IObserver<T> notifica o nossa aplicação quando um evento ocorre e assim podemos tratar esse evento e fazer determinadas ações.

O Rx também fornece métodos para tratamento de exceções, cancelamentos, sincronizações entre outros.

Arquitetura do Rx .NET

Usando o Rx .NET (Reactive Extensions)

O Rx .NET está disponível  no NuGet e para usar a biblioteca basta executar esse comando no Package Manager Console: Install-Package Rx-Main -Pre

Vamos a alguns exemplos de como usar o Rx.Net

Observando um determinado Evento:

public static event EventHandler SampleEvent;

static void Main(string[] args)
{
   var eventAsObservable = Observable.FromEventPattern(ev => SampleEvent += ev, 
                                                       ev => SampleEvent -= ev);

   //Create event subscriber
   var s = eventAsObservable.Subscribe(x => Console.WriteLine("evento recebido"));

   Console.WriteLine("Raise event");
   if (SampleEvent != null)
       SampleEvent(null, EventArgs.Empty);

   Thread.Sleep(100);

   Console.WriteLine("Unsubscribe");
   s.Dispose();
   Console.ReadKey();
}

 

Disparando um Subscribe por Intervalos

static void Main()
{
 
   var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval();

   using (observable.Subscribe(
          x => Console.WriteLine("{0}: {1}", x.Value, x.Interval)))
   {
     Console.WriteLine("Press any key to unsubscribe");
     Console.ReadKey();
   }

   Console.WriteLine("Press any key to exit");


   Console.ReadKey();
}

 

Observando um Range de Valores

static void Main(string[] args)
{
    Samples().Wait();
}

static async Task Samples()
{
    var xs = Observable.Range(0, 10, ThreadPoolScheduler.Instance);

    Console.WriteLine("Last  = " + await xs);
    Console.WriteLine("First = " + await xs.FirstAsync());
    Console.WriteLine("Third = " + await xs.ElementAt(3));
    Console.WriteLine("All   = " + string.Join(", ", await xs.ToList()));

    try
    {
        Console.WriteLine("Erro = " + await xs.Select(x => 1 / (5 - x)));
    }
    catch (DivideByZeroException)
    {
        Console.WriteLine("Aconteceu um Erro");
    }
}

 

Suporte a Subscribe Async

static void Main(string[] args)
{
    var fsw = new FileSystemWatcher(@"C:\")
    {
       IncludeSubdirectories = true,
       EnableRaisingEvents = true
    };

    var changes = from c in Observable.FromEventPattern<FileSystemEventArgs>(fsw, "Changed")
                  select c.EventArgs.FullPath;
    changes.Window(TimeSpan.FromSeconds(5)).Subscribe(async window =>
    {
         var count = await window.Count();
         Console.WriteLine("{0} events last 5 seconds", count);
    });

    Console.ReadLine();
}

 

Fazendo um Jogo Ping-Pong Reativo

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ReactiveExtensions
{
    class Program
    {
        static void Main(string[] args)
        {
            var ping = new Ping();
            var pong = new Pong();

            Console.WriteLine("Press any key to stop ...");

            var pongSubscription = ping.Subscribe(pong);
            var pingSubscription = pong.Subscribe(ping);

            Console.ReadKey();

            pongSubscription.Dispose();
            pingSubscription.Dispose();

            Console.WriteLine("Ping Pong has completed.");
        }
    }

    class Ping : ISubject<Pong, Ping>
    {
        public void OnNext(Pong value)
        {
            Console.WriteLine("Ping received Pong.");
        }

        public void OnError(Exception exception)
        {
            Console.WriteLine("Ping experienced an exception and had to quit playing.");
        }

        public void OnCompleted()
        {
            Console.WriteLine("Ping finished.");
        }

        public IDisposable Subscribe(IObserver<Ping> observer)
        {
            return Observable.Interval(TimeSpan.FromSeconds(2))
                .Where(n => n < 10)
                .Select(n => this)
                .Subscribe(observer);
        }

        public void Dispose()
        {
            OnCompleted();
        }


    }

    class Pong : ISubject<Ping, Pong>
    {
        public void OnNext(Ping value)
        {
            Console.WriteLine("Pong received Ping.");
        }

        public void OnError(Exception exception)
        {
            Console.WriteLine("Pong experienced an exception and had to quit playing.");
        }

        public void OnCompleted()
        {
            Console.WriteLine("Pong finished.");
        }


        public IDisposable Subscribe(IObserver<Pong> observer)
        {
            return Observable.Interval(TimeSpan.FromSeconds(1.5))
                .Where(n => n < 10)
                .Select(n => this)
                .Subscribe(observer);
        }
        public void Dispose()
        {
            OnCompleted();
        }

        
    }
}

 

O Rx .NET é a ferramenta ideal para criarmos aplicações reativas e incrivelmente poderosas. Com aplicações reativas conseguimos aumentar o nível de interatividade dos nossos usuários utilizando esse modelo de programação em aplicações web e até jogos.

O que acharam do Rx .NET ? Não deixem de comentar.

Para mais informações sobre o Rx .Net clique aqui.

Abs e até a próxima.

 

Deixe uma resposta

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Esse site utiliza o Akismet para reduzir spam. Aprenda como seus dados de comentários são processados.