ETL Capabilities of WSO2 Enterprise Integrator — Part II

Real time data migration from Salesforce to Data Lake (MongoDB) and legacy database (RDBMS)

Natasha Wijesekare
4 min readMay 17, 2019

The second blog of the series will take you through migrating real time data from Salesforce to Data Lake (MongoDB) and legacy database system (RDBMS). We have used the tooling support provided via WSO2 EI tooling to create and manage the artifacts.

An brief overview of the use case is as follows:

  • The legacy database system used here will be a MySQL database. This database will store information about Salesforce accounts.
  • MongoDB will be used as the data store for Data Lake. This data store will store information about Salesforce accounts.
  • Both the legacy database and Data Lake will be exposed as two separate data services. They will expose a REST/SOAP interface to interact with the legacy database and Data Lake. SOAP operations or REST resources will be exposed by each data service to add, update and delete records.
  • Salesforce streaming inbound endpoint will be used to receive notifications for changes to Salesforce data that match a Salesforce Object Query Language (SOQL) query defined. The Salesforce inbound endpoint acts as a message consumer. It creates a connection to the Salesforce account, consumes the Salesforce data and injects the data to the specified sequence.

The diagram below gives you a high level overview of the use case:

Real time data migration from Salesforce to Data Lake (MongoDB) and legacy database (RDBMS)

Follow the steps given below:

  1. Define 2 data services to expose the legacy database and Data Lake as data services. The data services should expose SOAP operations or REST resources to add, update and delete records.
  2. To use the Salesforce streaming inbound endpoint, create a PushTopic that contains an SOQL query will be created. The SOQL query defined should retrieve information about modifications done to accounts.

3. Configure the Salesforce streaming inbound endpoint to retrieve account details from your Salesforce account.

Design view of the Salesforce inbound endpoint
Source view of the Salesforce inbound endpoint

The data consumed from Salesforce will be injected to a sequence named ‘updateAccountSeq’ as specified in the inbound endpoint configuration. This sequence will update the account information in the legacy database and Data Lake.

Source view of the sequence that is invoked by the inbound endpoint

Deploy the Salesforce inbound endpoint and the sequence in EI. Now let’s go through each element in the sequence and understand what each means.

The property mediators will query the body of the response using XPath and will extract the needed properties and set them to be used later.

The event types given by Salesforce are 3 fold:

  1. created- if a Salesforce account was newly created
  2. updated- if an existing Salesforce account was updated/modified
  3. deleted- if an existing Salesforce account was delete

Based on these event types the operations we should do on the legacy database and Data Lake will vary.

  1. If a new Salesforce account was created, then we should execute the “insert”operation to insert the new record to both the data sources.
  2. If properties of an existing Salesforce account was updated, then we should execute the “update”operation to update the modified record in both the data sources.
  3. If an existing Salesforce account was deleted, then we should execute the “delete” operation to delete the record from both the data sources.

We have used a switch mediator for this purpose. The switch mediator will retrieve the event type using XPath and it will manipulate the message by matching the returned string within each case statement.

Suppose the event type is “created” meaning a new account was added via Salesforce UI, then the “insert” operations of the 2 data sources will be executed separately. Here we have used the Payload factory mediator to transform the message content as needed by the data service. Since we have exposed a SOAP operation we will use the Payload factory mediator to construct a payload with a SOAP envelope.

Afterwards we use the call mediator to invoke relevant SOAP operation by passing the payload in the request. First the legacy database is updated and then Data Lake is updated. The same steps are followed for the other events i.e. “deleted” and “updated”.

Add a new account or update/delete an existing account in Salesforce, this will be reflected in the legacy database and Data Lake on the fly which is interesting :)

For more information refer the following resources:

  1. https://docs.wso2.com/display/EI640/Exposing+a+Datasource+as+a+Data+Service
  2. https://docs.wso2.com/display/EI640/Exposing+MongoDB+as+a+Data+Service
  3. https://docs.wso2.com/display/ESBCONNECTORS/Configuring+Salesforce+Streaming+Inbound+Endpoint+Operations
  4. https://github.com/wso2-extensions/esb-inbound-salesforce

So this brings us to an end of this blog series which showcased 2 ETL use cases supported by WSO2 Enterprise Integrator along with Salesforce. Now as for how to end with a bang? Well, that’s another blog post for another time so stay tuned:) :)

--

--