rabbit_force

Documentation Status Build status Coverage MIT license

rabbit_force is a Salesforce Streaming API to RabbitMQ adapter service. It listens for event messages from Salesforce’s Streaming API and forwards them to a RabbitMQ broker for you, so you don’t have to.

Streaming API is useful when you want notifications to be pushed from the server to the client based on criteria that you define with PushTopics or to receive generic streaming messages. While RabbitMQ is one of the most popular options for implementing inter-service communication in a microservice architecture.

While there are lots of great client implementations for RabbitMQ/AMQP for various languages, there are a lot less Streaming API clients, and many of them are badly maintained. Furthermore RabbitMQ offers much more flexible message consumption techniques.

rabbit_force aims to fix these problems, by providing and adapter between the Streaming API and RabbitMQ, so inter-connected services can consume Streaming API event messages with RabbitMQ. It even supports connection with multiple Salesforce orgs, multiple RabbitMQ brokers, and routing messages between them.

Features

  • Forward Streaming API messages from one or more Salesforce orgs to one or more RabbitMQ brokers

  • Route incoming messages to a specific broker and exchange with the specified routing key and properties, with the help of routing rules defined as JSONPath expressions

  • Support for Salesforce’s replay extension for message reliability and durability by storing replay markers in a Redis database

  • Configurable error handling behavior, either fail instantly or try to recover from network and service outages

  • Message sources, sinks and routing configurable with JSON or YAML configuration files

  • Implemented using python asyncio for efficient handling of IO intensive operations

Usage

A relatively simple use case is illustrated on the image bellow. rabbit_force is connected to a single message source and message sink, or in other words to a single Salesforce org named as my_org and a single RabbitMQ broker named my_broker. It listens for messages from two PushTopics (lead_changes and contact_changes) and a StreamingChannel (my_channel), and forwards messages into the exchange my_exchange with different routing keys. A redis database is used to store replay markers sent by Salesforce to take advantage of message durability.

_images/usage.svg

The configuration file bellow sets up rabbit_force to forward messages from lead_changes, contact_changes and my_channel with the routing keys of lead_change_message, contact_change_message and my_channel_message respectively.

config.yaml
# message source definition
source:
  # mapping of Salesforce orgs to use as message sources
  orgs:
    # a Salesforce org named as "my_org"
    my_org:
      # authentication credentials
      consumer_key: "<consumer_key>"
      consumer_secret: "<consumer_secret>"
      username: "<username>"
      password: "<password>"
      # list of resources that the service can listen to for messages
      # if they doesn't exist, they'll be created on application startup
      resources:
        # a PushTopic resource
        - type: PushTopic
          # the definition of the PushTopic
          spec:
            Name: lead_changes
            ApiVersion: 42.0
            NotifyForFields: Referenced
            NotifyForOperationCreate: true
            NotifyForOperationUpdate: true
            NotifyForOperationDelete: true
            NotifyForOperationUndelete: true
            Query: SELECT Id, Email, Name, Phone, MobilePhone, Status, LeadSource FROM Lead
          # optional durable flag, if false then the resource will be removed on application shutdown
          durable: false

        # a PushTopic resource
        - type: PushTopic
          # the definition of the PushTopic
          spec:
            Name: contact_changes
            ApiVersion: 42.0
            NotifyForFields: Referenced
            NotifyForOperationCreate: true
            NotifyForOperationUpdate: true
            NotifyForOperationDelete: true
            NotifyForOperationUndelete: true
            Query: SELECT Id, Email, Name, Phone, MobilePhone FROM Contact
          # optional durable flag, if false then the resource will be removed on application shutdown
          durable: false

        # a StreamingChannel resource
        - type: StreamingChannel
          # the definition of the StreamingChannel
          spec:
            Name: /u/my_channel
            Description: Streaming channel for notifications
  # optional replay storage definition. if defined it'll be used to store replay
  # markers sent by Salesforce in order to support message durability
  replay:
    # redis server address
    address: "redis://localhost:6389"
    # key prefix
    key_prefix: replay

# message sink definition
sink:
  # mapping of RabbitMQ brokers to use as message sinks
  brokers:
    # a RabbitMQ broker named as "by_broker"
    my_broker:
      # host name of the broker
      host: localhost
      # definition of the exchange where the messages should be forwarded
      exchanges:
        - exchange_name: my_exchange
          type_name: topic
          durable: true

# message router definition
router:
  # optional default route to use if no routing rule matches a given message
  default_route:
    broker_name: my_broker
    exchange_name: my_exchange
    routing_key: my_channel_message

  # list of routing rules
  rules:
    # JSONPath filter expression as the condition
    - condition: "$[?(@.message.channel ~ '.*/lead_changes')]"
      # the route to use if the condition produces a non-empty match
      route:
        broker_name: my_broker
        exchange_name: my_exchange
        routing_key: lead_change_message
    # JSONPath filter expression as the condition
    - condition: "$[?(@.message.channel ~ '.*/contact_changes')]"
      # the route to use if the condition produces a non-empty match
      route:
        broker_name: my_broker
        exchange_name: my_exchange
        routing_key: contact_change_message

A sample run of rabbit_force with the above configuration file.

$ python -m rabbit_force config.yaml
2018-06-19 16:23:07,909:INFO: Starting up ...
2018-06-19 16:23:07,996:INFO: Configuration loaded from 'config.yaml'
2018-06-19 16:23:07,999:INFO: Configuring application ...
2018-06-19 16:23:10,619:INFO: Using message broker AmqpBroker(host='localhost', port=None, login='guest', password='guest', virtualhost='/', ssl=False, login_method='AMQPLAIN', insist=False, verify_ssl=True)
2018-06-19 16:23:12,128:INFO: Listening for messages from Salesforce org 'my_org':
    * from PushTopic 'lead_changes' on channel '/topic/lead_changes'
    * from PushTopic 'contact_changes' on channel '/topic/contact_changes'
    * from StreamingChannel '/u/my_channel' on channel '/u/my_channel'
With replay storage RedisReplayStorage(address='redis://localhost:6389', key_prefix='replay:my_org', additional_params={}, ignore_network_errors=False).
2018-06-19 16:23:48,119:INFO: Forwarded message 1 on channel '/topic/lead_changes' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='lead_change_message', properties=None).
2018-06-19 16:24:03,039:INFO: Forwarded message 1 on channel '/topic/contact_changes' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='contact_change_message', properties=None).
2018-06-19 16:24:20,180:INFO: Forwarded message 1 on channel '/u/my_channel' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='my_channel_message', properties=None).
2018-06-19 16:24:27,097:INFO: Shutting down ...

Indices and tables