Friday, April 30, 2010

Grafting Mule Endpoints

Note: The following code samples are applicable to Mule 2.


In Mule ESB, outbound dispatching to a destination whose address is known at runtime only is a pretty trivial endeavor. A less frequent practice consists in programmatically defining inbound service endpoints.

I recently had to do such thing for a little side project I'm running where Mule is used as a frontal bus and load throttler in front of a R nodes exposed over RMI. The goal was to have a non-fixed number of file inbound endpoints defined in a simple properties file and declare them on a particular service during the initialization sequence of Mule.

As an integration framework, Mule ESB exposes all its moving parts and lets you configure them easily with its Spring-powered XML DSL: that's all we need to achieve the above goal.

Let's first look at the resulting service configuration:


As you can see the inbound router doesn't have any endpoint configured on it. This is where we will programmatically graft the file endpoints configured in an external properties file.

Before digging into the code used for this grafting operation, let's look at how the grafter itself is configured:


Unsurprisingly, we use a Spring configured POJO to perform the endpoints generation. Notice how the service and the file connector are referenced: instead of using names I'm directly referencing Mule configuration elements. Because Spring is used consistently being the scene, this kind of cross referencing is possible and the key to many advanced tricks!

Now take a deep breath and take a look at the code in charge of grafting the endpoints to the target service:


The important things to pay attention to are the following:
  • The class implements MuleContextAware in order to receive an instance of the MuleContext, which is the key to the gate of Mule's innermosts. Some might consider fetching this class from the connector object that gets injected in this class too: I personally find this less desirable for design reasons that I'll let Demeter explain.
  • The endpoint is bound to the desired connector by passing its name in the URI used to create it. This allows picking up the right connector, which is compulsory for any Mule configuration with more than one instance of a particular connector (file connectors in this case).
  • Endpoint specific configuration parameters, like moveToDirectory, are configured as extra URI parameters. You can also add other parameters, as key/value pairs: they will be automatically added to the message properties dispatched from this endpoint.
And voila, though you may never have to do this kind of things in your Mule ESB projects, you've gained some deeper experience into what a reasonably skilled gardener can do with this powerful platform.

17 comments:

zepag said...

Thanks you MuleGuru (a brand new beast?)! This will soon come in handy!

Anonymous said...

Great post! Just one question, is it possible to add a ‘file:filename-wildcard-filter’ to these endpoints?

David Dossot said...

It's possible to add a filter to these dynamic endpoints but to do so you need to call a different method on the endpointFactory, ie. getInboundEndpoint(EndpointBuilder builder) instead of getInboundEndpoint(String uri).

This allows you to create an endpoint builder that as a filter defined on it:

EndpointBuilder endpointBuilder = new EndpointURIEndpointBuilder(uri, muleContext);
endpointBuilder.setFilter(filter);

then build an endpoint with this builder.

joe said...

A lot of changes have been introduced in the new 3.0.0-RC1 version. Classes such as 'InboundRouterCollection' doesn't exist anymore..

David Dossot said...

Joe, you're right. I've added a note on the first line of the post.

joe said...

Hi, I am trying to build a quartz endpoint dynamically (similar to the file endpoint above). How would you set a "quartz:event-generator-job" for the endpoint (as in the config below)? This is similar to the 'file:filename-wildcard-filter' above. Any help is appreciated.

David Dossot said...

Joe, I've never tried with Quartz endpoints but from what I can see you would need to set a property on the endpoint builder named "jobConfig" and with a value of type org.mule.transport.quartz.jobs.EventGeneratorJobConfig.

Anonymous said...

I've been using this in 2.x, and now while trying to upgrade to 3.0 it breaks.

I've been reading up on the "new Pipes & Filters" architecture (Message Source, Message Processor, InterceptingMessageProcessor, Listeners etc.) Due to lack of good documentation on these API changes, I still couldn't find a straight forward way to migrate to the new API.

All I've been doing in 2.x was creating multiple file inbound endpoints with FilenameWildcardFilters, on the fly (when mule starts up). Would you be able to post some snippets of code about how it could be done using 3.0? Or at least some pointers to how this could be accomplished. Hope I am not asking for too much!

David Dossot said...

Hey Anonymous,

Indeed, the API changed a lot in Mule 3.

The migration guides are here and there.

In your case, if you add several inbound endpoints to the same service, you want to group them in a org.mule.api.source.CompositeMessageSource.

Anonymous said...

The issue with the migration guide (Summary of API changes) is that, it doesn't address the removal of InboundRouterCollection (and a host of other classes).

Here is where I am(stuck). I am hoping that when you glance over this, you could quickly point out what's wrong.

I have a service defined this way:









In the code I wire-in service "abc" as "service", and do this:

CompositeMessageSource compMessageSource = new ServiceCompositeMessageSource();
service.setMessageSource(compMessageSource);
EndpointBuilder endpointBuilder = new EndpointURIEndpointBuilder(endpointURI, muleContext);
InboundEndpoint endpoint = endpointFactory.getInboundEndpoint(endpointBuilder);
compMessageSource.addSource(endpoint);

With the above code, I get this exception:
org.mule.api.lifecycle.LifecycleException: The required object/property "listener" is null
at org.mule.source.StartableCompositeMessageSource.start(StartableCompositeMessageSource.java:143)

So I modified it as below:

CompositeMessageSource compMessageSource = new ServiceCompositeMessageSource();
compMessageSource.setListener(service.getComponent());
service.setMessageSource(compMessageSource);
EndpointBuilder endpointBuilder = new EndpointURIEndpointBuilder(endpointURI, muleContext);
InboundEndpoint endpoint = endpointFactory.getInboundEndpoint(endpointBuilder);
compMessageSource.addSource(endpoint);

With that I get a null pointer at
org.mule.transport.AbstractConnector.registerListener(AbstractConnector.java:1269)

In Mule 2.x, I didn't have to set a listener. Just wondering if you might know the fix. Thanks much.

David Dossot said...

The service XML is gone, use a gist to include it.

I think you should fully create your CompositeMessageSource before connecting to your service.

Since the service is already initialized, to hook your composite to the service, you need something like:

compMessageSource.setListener(service);

And no, I haven't tried that yet :)

Anonymous said...

David,

I finally figured it out. Here it is:

EndpointBuilder endpointBuilder = new EndpointURIEndpointBuilder(address, muleContext);
InboundEndpoint endpoint = endpointFactory.getInboundEndpoint(endpointBuilder);
((CompositeMessageSource)service.getMessageSource()).addSource(endpoint);

Thanks for all your help,
Sajeev Joseph

David Dossot said...

Yep, I saw Daniel's response on the Mule's mailing list.

The trick was that Service already has a CompositeMessageSource created (unlike a Flow that would have nothing).

Anonymous said...

David is it possible to set the java component (spring managed) to the service?

I need to dynamically define the whole service definition: Inbound + Component.

Any tip?

thanks
adrian

David Dossot said...

Adrian, theoretically yes, everything you can do with XML, you can do by code.

Mo said...

Hi David,

I have a similiar code to this in a custom message processor running in Mule 3.1.3:

muleMessage = inboundEndpoint.request(inboundEndpoint.REQUEST_NO_WAIT)

This seems ignore wildcard match or in general produces a single event at a time. Any idea why would it behave like this?

Regards,
Mohamed

David Dossot said...

@Mohamed: Please ask your question on SO (http://stackoverflow.com/questions/tagged/mule) and provide more details (config, code) so the issue can be reproduced easily.