Saturday, September 13, 2008

How to read messages from queue


The problem is how to read messages without removing them from the queue when NMS (C#) client is used.
Unfortunately ActiveMQ does not expose its administrative interfaces so it is not possible to obtain list of queues, or enumerate messages in the queue without removing them.


But knowledge of how queue server works gives us opportunity to cheat. If we open session with ClientAcknowledge mode then server will wait for clien issued confirmation before deleting message from queue. If we intentionally skip confirming than we can keep reading messages up to some limit.


using(IConnection conn = new ConnectionFactory("tcp://localhost:61616").CreateConnection())
{
conn.Start();
ISession session = conn.CreateSession(AcknowledgementMode.ClientAcknowledge);
IQueue queue = session.GetQueue("aaa");
for(int i=0; i<10; i++)
producer.Send(producer.CreateTextMessage("a test "+i));

List<IMessage> list = new List<IMessage>();
using(IMessageConsumer iterator = session.CreateConsumer(queue)) {
iterator.Listener += delegate(IMessage m) {list.Add(m);};
System.Threading.Thread.Sleep(250);
}
Console.WriteLine("First attempt: {0}", list.Count);

// walk through messages again to make sure we did not remove them first time
list.Clear();
using(IMessageConsumer iterator = session.CreateConsumer(queue)) {
iterator.Listener += delegate(IMessage m) {list.Add(m);};
System.Threading.Thread.Sleep(250);
}
Console.WriteLine("Second attempt: {0}", list.Count);



It is important to open session in ClientAcknowledge and not acknowledge messages to avoid their removal from the queue. Also I wrapped consumer into "using" clause in order to call Dispose which will inform queue server that messages sent to this consumer are free. We want to do it as soon as we are done and not relay on garbage collector which can kick in who knows when.


The sleep for 250ms is to give time to consumer to fulfill its buffer. See my notes about problem here.


The method described is a trick and there is a limit on how many messages you can receive. ActiveMQ will throttle consumer which receives messages but does not confirm their processing. But you hould be fine with showning first hundred or so messages. It would serve displaying purposes.

NMS: Consuming ActiveMQ messages Asynchronously



There are some articles around which demonstrate how to implement async message consumer in C# but they employ Spring Framework. In my opinion it is not justified on many accounts. Spring may be good for your project or may be not.

So here we go:

using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace NMSTestConsole
{
class MainClass
{
public static void Main(string[] args)
{
using(IConnection conn = new ConnectionFactory("tcp://localhost:61616").CreateConnection())
{
conn.Start();
ISession session = conn.CreateSession();
IQueue queue = session.GetQueue("aaa");
IMessageProducer producer = session.CreateProducer(queue);

IMessageConsumer consumer3 = session.CreateConsumer(session.GetQueue("aaa"));
consumer3.Listener += delegate(IMessage m) {Console.WriteLine("aaa: \n{0}",m);};

producer.Send(producer.CreateTextMessage("a test"));

Console.WriteLine("Press any key...");
Console.ReadKey();
}
}
}
}


Look ma, no Spring!
Please pay attention to conn.Start() call. Without it your consumers will remain in synchronous mode and will not serve messages to their listeners.
But if you do not assign a listener to your consumer then you still can do it synchronous way: "Console.WriteLine(consumer3.Receive());" and connection can remain in asynchronous mode.

Saturday, August 30, 2008

Language complexity irony



I'm in process of learning Java deeper and I discover Generics. No, I don't mean "Java has generics. Wow!" I mean corner cases like this:

class MyClass<T extends MyClass<T>>

Or this one:

What is the difference between a Collection<Pair<String,Object>>, a Collection<Pair<String,?>> and a Collection<? extends Pair<String,?>>?


It takes significant effort to imagine what such class is and what are its usages and limitations.
The irony I see is that Java initially was introduced as simplified C++. Not just C++ with garbage collector but the structure of language was simplified in order to become programming language for masses. And what we see 10 years later? Java became no less difficult then C++ in some aspects. Programming is too complex area to do it with simple tools.

Lesson learned: do things as simple as possible but not simpler :)

Thursday, August 28, 2008

Vadim's blog

Vadim
I am guessing that log4net wasn't included in order to simplify binary dependencies. But including it makes a lot of sense, log4net is quite popular in c# community.

Tracing in NMS



It easy to turn protocol tracing on in NMS client.
First of all you need to implement your trace class. I am lazy so I wrote an adapter for log4net.


/*
* ----------------------------------------------------------------------------
* "THE BEER-WARE LICENSE"
* Vadim Chekan wrote this file. As long as you retain this notice you
* can do whatever you want with this stuff. If we meet some day, and you think
* this stuff is worth it, you can buy me a beer in return
* ----------------------------------------------------------------------------
*/

using System;
using log4net;
using Apache.NMS;

namespace NMSTestConsole
{
public class TraceAdapter : Apache.NMS.ITrace
{
// keep logs on behalf of NMS Tracer
static readonly ILog _log = LogManager.GetLogger(typeof(Apache.NMS.Tracer));

public bool IsDebugEnabled {
get {return _log.IsDebugEnabled;}
}

public bool IsInfoEnabled {
get {return _log.IsInfoEnabled;}
}

public bool IsWarnEnabled {
get {return _log.IsWarnEnabled;}
}

public bool IsErrorEnabled {
get {return _log.IsErrorEnabled;}
}

public bool IsFatalEnabled {
get {return _log.IsFatalEnabled;}
}

public void Debug (string message)
{
_log.Debug(message);
}

public void Info (string message)
{
_log.Info(message);
}

public void Warn (string message)
{
_log.Warn(message);
}

public void Error (string message)
{
_log.Error(message);
}

public void Fatal (object message)
{
_log.Fatal(message);
}
}
}


Now you need hook it up somwhere at the very beginning of your program.

using Apache.NMS;
using Apache.NMS.ActiveMQ;
...
Tracer.Trace = new TraceAdapter();
log4net.Config.BasicConfigurator.Configure();


That's it. Run your program which communicates to ActiveMQ and see the result.

Getting more info


There is not too much information right now. Output looks like:

Parsing type: 1 with: Apache.NMS.ActiveMQ.OpenWire.V1.WireFormatInfoMarshaller
Parsing type: 2 with: Apache.NMS.ActiveMQ.OpenWire.V2.BrokerInfoMarshaller
Parsing type: 30 with: Apache.NMS.ActiveMQ.OpenWire.V2.ResponseMarshaller


Those numbers are ID of OpenWire packet types. Probably you would prefer something more human readable. You can achieve it by setting UseLogging property of TcpTransportFactory. The best wat of doing it is via broker url parameters:

IConnection conn = new ConnectionFactory("tcp://localhost:61616?transport.UseLogging=true").CreateConnection();



SENDING: WireFormatInfo[ Magic=System.Byte[] Version=2 MarshalledProperties={CacheEnabled=False, SizePrefixDisabled=False, StackTraceEnabled=False, TcpNoDelayEnabled=False, TightEncodingEnabled=False} ]
414 [-1225462896] DEBUG Apache.NMS.Tracer (null) - Parsing type: 1 with: Apache.NMS.ActiveMQ.OpenWire.V1.WireFormatInfoMarshaller
424 [-1225462896] INFO Apache.NMS.Tracer (null) - RECEIVED: WireFormatInfo[ Magic=System.Byte[] Version=3 MarshalledProperties={CacheEnabled=True, CacheSize=1024, SizePrefixDisabled=False, TightEncodingEnabled=True, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, StackTraceEnabled=True, TcpNoDelayEnabled=True} ]

SENDING: ConnectionInfo[ ConnectionId=ConnectionId[ Value=2957a0f0-a8b7-4abc-96ea-e5638b6d0baa ] ClientId=49b903c4-f55e-40b0-a78c-bde850b77ef8 Password= UserName= BrokerPath= BrokerMasterConnector=False Manageable=False ClientMaster=True ]
453 [-1225462896] DEBUG Apache.NMS.Tracer (null) - Parsing type: 2 with: Apache.NMS.ActiveMQ.OpenWire.V2.BrokerInfoMarshaller
463 [-1225462896] INFO Apache.NMS.Tracer (null) - RECEIVED: BrokerInfo[ BrokerId=BrokerId[ Value=ID:ubuntu-47413-1219891509239-0:0 ] BrokerURL=tcp://ubuntu:61616 PeerBrokerInfos=Apache.NMS.ActiveMQ.Commands.BrokerInfo[] BrokerName=localhost SlaveBroker=False MasterBroker=False FaultTolerantConfiguration=False DuplexConnection=False NetworkConnection=False ConnectionId=0 ]
463 [-1225462896] DEBUG Apache.NMS.Tracer (null) - Parsing type: 30 with: Apache.NMS.ActiveMQ.OpenWire.V2.ResponseMarshaller
463 [-1225462896] INFO Apache.NMS.Tracer (null) - RECEIVED: Response[ CorrelationId=1 ]
474 [-1211062496] INFO Apache.NMS.Tracer (null) - SENDING: SessionInfo[ SessionId=SessionId[ ConnectionId=2957a0f0-a8b7-4abc-96ea-e5638b6d0baa Value=1 ] ]
475 [-1225462896] DEBUG Apache.NMS.Tracer (null) - Parsing type: 30 with: Apache.NMS.ActiveMQ.OpenWire.V2.ResponseMarshaller
475 [-1225462896] INFO Apache.NMS.Tracer (null) - RECEIVED: Response[ CorrelationId=2 ]



Now you can have much better idea what your client is chatting about with the server.

Wednesday, August 27, 2008

NMS gotcha



I use NMS C# client to get messages from ActiveMQ and I faced a problem: despite there were messages in the queue my "consumer.ReceiveNoWait()" returned nothing.


After some code reading I realized the reason. My code looked like


using (IMessageConsumer consumer = session.CreateConsumer(queue, filter))
IMessage oldMessage;
while ((oldMessage = consumer.ReceiveNoWait() != null)
...



Apparently ReceiveNoWait does not send any request to the queue server. What it does is checking its own buffer of received messages.
When you create a Message Consumer then you notify the queue server that you want to receive messages from certain Target (Queue). But messages are not sent as registration response. So you have consumer registered but no messages yet.
Now, if you try to call consumer.ReceiveNoWait() right after registration you will get null. You need to wait a little bit before queue will send messages to the consumer.

/
Adding 100ms wait helps to address this problem:


using (IMessageConsumer consumer = session.CreateConsumer(queue, filter))
IMessage oldMessage;
while ((oldMessage = consumer.Receive(TimeSpan.FromMilliseconds(100))) != null)
...

Monday, August 25, 2008

Camel internals

I'm playing with Apache Camel project and boy, it is not easy to understand it internals!
Lets see how it is initialized.

SpringCamelContext(DefaultCamelContext).doStart() line: 543
SpringCamelContext.maybeDoStart() line: 165
SpringCamelContext.doStart() line: 160
SpringCamelContext(ServiceSupport).start() line: 47
SpringCamelContext.maybeStart() line: 95
SpringCamelContext.onApplicationEvent(ApplicationEvent) line: 114

So maybeStart calls start which calls doStart which calls maybeDoStart which calls doStart.
Now question from combinatorics: how many function names can we generate from 3 words: "maybe", "do", "start" :)))

Monday, August 04, 2008

XMPP and web



It seems XMPP protocol finally is getting recognition which it deserves. On 2008 OSCON there are some talks about using XMPP for async notifications.
Paper one. Paper two.

But like any new technology there are some confusion of how and when to use it.

So with XMPP my server will have to keep millions connections?


If we replace http pulling with a client listening to the server it means that all clients will have a persistent TCP connection to the server. If you have millions of clients then you are in trouble. Right?



No. XMPP address (JID or Jabber ID) looks like email. For example me@jabber.com or if you use google mail, then your JID is the same as your google email. Now lets say your server is mycompany.com. and your jabber server is at xmpp.mycompany.com. If a client from google chats with a service at your server then there is no direct TCP connection between client and your server. Instead client connects to Google, sends a message "to:room@xmpp.mycompany.com" and google will connect to xmpp.mycompany.com. Now, what will happen when 100 google clients will connect to a chat room at your server? Google server will re-use the same TCP connection to send your server a 100 "presence" xmpp packages. Handling even thousands packages is not a big deal at all.

So your server will handle as many connections as many unique domain your clients have. Which should be small enough number.

What about JMS?

XMPP does not replace JMS (Java Message Service) or any other messaging service. The principal difference is that JMS can guarantee delivery of a message or will return an error in case if message can not be delivered in time. With xmpp if client disconnected even for a short time then the client will not see any messages sent while he was offline. In other words XMPP is not for offline operations.

In JMS ( or any other queue architecture) sender can issue a message and as soon as it hit the queue there no need to be connected. Client can go offline and when it is back online pick up all message from the queue.

A good example for XMPP is a client application which shows properties of object. If other user in the system edited a property then server can issue a broadcast indicating ID of updated object. Every online application will refresh the property immediately. There is no need to listen to those messages if client is disconnected.

Now let's consider a replication system. Lets say we have a Corporation which is integrated with some Service Provider. Corporation uploaded its data to a Service Provider but now we have a dilemma: users can update data either in legacy Corporation application or in Service Provider interface.

Service Provider can create a queue for the Corporation and add a trigger in DB which dumps all changes to the queue. Corporation has an application which listens to the queue and updates legacy DB with changes made with Service Provider application.

Now imagine that Corporation has been disconnected from Service Provider for some time (maintenance, networking problems, etc). Obviously when it is back online it is crucial to receive all messages issues during offline period. XMPP is no way the tool for this task. JMS is.

XMPP does not provide reliability. (Correction: there exist a publish-subscribe extension for XMPP which has persistent messages option. But not all servers have it implemented so for queuing purposes it is still better to use a reputable queue system).

When pull is better


XMPP helps in situation when updates happen not too often. If message traffic is high then each time you perform pull you are quite sure that you will get data. So there is not too much sense in implementing asynchronous notification. Unless you have strong requirements to the reaction time.

What about WS-notification/SOAP

The big problem is that HTTP protocol is one-way communication. Basically server never talks to the client. Server even does not know how to reach its clients. WS-Notification attempts to solve this problem by client calling the server and telling the server where it can find the client if event happen. In case of corporate integration it may work. You assault your IT department and get a server where you put your little callback web service. But there exist cases which never will work. Web browser generally speaking can not be accessible because of firewalls. So WS-Notification is not a generic solution. XMPP is a solution because it initiates connection to the server and remains connected so server always can send an even to the client.


Friday, March 14, 2008

OpenID Provider Identifier vs Claimed Identifier



It is not emphasized in OpenID specification (version 2.0) but when you log in into OpenID enabled site, you can either enter your Claimed Identifier, for example mylogin.pip.verisignlabs.com or alternatively you can enter just your provider address, for example "yahoo.com" and site will figure it out that the provider you've entered supports OpenID.

How it does this? Simlpe.
According to the spec url you entered is normalized: added "http://" and followed the redirect links. If you enter "yahoo.com" in your browser, you will be redirected to "http://yahoo.com/". Now, lets see how OpenID server location is discovered with help of small Ruby script.
require 'net/http'
h = Net::HTTP.new('www.yahoo.com')
resp = h.head('/', 'Accept' => 'application/xrds+xml')
puts "Code = #{resp.code}"
resp.each_key {|key| puts "#{key} = #{resp[key]}"}


This script issues "HEAD" http command to the yahoo server. Note "application/xrds+xml" Accept header. If the server supports OpenID it should return a header indicating where OpenID server can be found.
The result is:
vadim@ubuntu:~/Projects/yadis_test$ ruby test.rb
last-modified = Sat, 15 Mar 2008 06:22:40 GMT
cache-control = private
vary = User-Agent
connection = close
p3p = policyref="http://p3p.yahoo.com/w3c/p3p.xml", CP="CAO DSP COR CUR ADM ..."
content-type = text/html; charset=utf-8
date = Sat, 15 Mar 2008 07:01:22 GMT
content-length = 9533
x-xrds-location = http://open.login.yahooapis.com/openid20/www.yahoo.com/xrds
accept-ranges = bytes
vadim@ubuntu:~/Projects/yadis_test$


Here we go. Yahoo OpenID server is located at http://open.login.yahooapis.com/openid20/www.yahoo.com/xrds
If you are curious enough, you can go to this url and download so-called "yadis" file, which is xml declaration of supported services and options.

Monday, March 03, 2008

Funny.
From RFC-2631: "In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is explicitly specified" :)

Sunday, January 06, 2008

Gnucash import script

I decided to track my finances with Gnucash. My bank allows downloading statements in qif format which can be imported by Gnucash.
But here is the problem: during the import the only way to classify your transactions (was it car maintenance, rent, or dinning?) is to manually assign every imported transaction to the appropriate account. If I'm trying to analyze my last year expenses, then it becomes virtually impossible to do it manually.
So I came up with a simple ruby script:


#!/usr/bin/ruby

$patterns = [
[/^PSHELL OIL/, 'Expenses:Auto:Gas'],
[/^PCHEVRON/, 'Expenses:Auto:Gas'],
[/^PTHE BOXING CLUB/, 'Expenses:Entertainment:Recreation'],
[/^PEDWARDS MIRA MESA STDM/, 'Expenses:Entertainment:Music/Movies'],
[/^PULTRASTAR CINEMAS/, 'Expenses:Entertainment:Music/Movies'],
[/^PHOBBYTOWN USA/, 'Expenses:Hobbies'],
]

def setCategory(tx)
tx.each do |line|
$patterns.each do |pattern|
rx = pattern[0]
if rx.match(line)
l = pattern[1]
tx << "L#{l}"
return
end
end #patterns
end #line
end

out = File.new('out.qif', 'w')
tx = []
txCount = 0
missed = 0;
while gets
if /^\^/.match($_)
txCount = txCount+1
len = tx.length
setCategory(tx)
if(len == tx.length)
puts tx
puts "-------------------"
missed = missed +1
end
out.puts tx
out.puts '^'
tx = []
else
tx << $_
end
end
puts "Transactions: #{txCount} Missed: #{missed}"


This scripts reads stdin, and modifies transactions by adding description to it, according to set of regular expressions. I gave a subset of regular expressions, you will have to modify them to match your codes and your Gnucash account names.

The trick is to add "L" code line to each transaction description. The qif file format can be found here. We need to add "L" line which would match to a desirable account name in your Gnucash setup.

In order to make the task easier, the script outputs unrecognized transactions to stdout. You can look at it, and add appropriate filters until count of unrecognized transactions become minor.