String together Storm-RMQ-PubNub sensor data topology
Let's wire in the JSON bolt to parse this JSON payload in our topology and see the execution:
...
public class JsonBolt implements IRichBolt {
..
private static final long serialVersionUID = 1L;
OutputCollector collector;
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
this.collector = arg2;
}
public void execute(Tuple arg0) {
String jsonInString = arg0.getStringByField("messages");
System.out.println("message read from queue" + jsonInString);
JsonConverter jsonconvertor = new JsonConverter();
MySensorData mysensorObj = jsonconvertor.run(jsonInString);
this.collector.emit(new Values(mysensorObj));
this.collector.ack(arg0);
}
..
}
..
class JsonConverter {
..
public MySensorData run(String jsonInString) {
ObjectMapper mapper = new ObjectMapper...