设为首页 - 加入收藏 焦点技术网
热搜:java
当前位置:首页 >

(四)storm-kafka源码走读之自定义Scheme

导读:本文原创,转载请注明出处:使用KafkaSpout需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用。这些Scheme主要负责从消息流中解析出所需要的数据。public interface Scheme extends Serializable { public List deserialize(byte[...。。。

本文原创,转载请注明出处:

使用KafkaSpout需要子类实现Scheme,storm-kafka实现了StringScheme,KeyValueStringScheme等等,大家可以用。

这些Scheme主要负责从消息流中解析出所需要的数据。

public interface Scheme extends Serializable {    public List deserialize(byte[] ser);    public Fields getOutputFields();}
需要实现反序列化方法和输出fields名称,来看简单StringScheme实现:

public class StringScheme implements Scheme {    public static final String STRING_SCHEME_KEY = "str";    public List deserialize(byte[] bytes) {        return new Values(deserializeString(bytes));    }    public static String deserializeString(byte[] string) {        try {            return new String(string, "UTF-8");        } catch (UnsupportedEncodingException e) {            throw new RuntimeException(e);        }    }    public Fields getOutputFields() {        return new Fields(STRING_SCHEME_KEY);    }}
其实就是直接返回了一个String,在Spout往后发射时就一个字段,其名为“str”,如果采用StringScheme时,大家在Bolt中可以用

tuple.getStringByField("str")

来获取其值。有人有疑问前面为什么用new SchemeAsMultiScheme(new StringScheme())呐?来看SchemeAsMultiScheme代码
public class SchemeAsMultiScheme implements MultiScheme {  public final Scheme scheme;  public SchemeAsMultiScheme(Scheme scheme) {    this.scheme = scheme;  }  @Override public Iterable> deserialize(final byte[] ser) {    List o = scheme.deserialize(ser);    if(o == null) return null;    else return Arrays.asList(o);  }  @Override public Fields getOutputFields() {    return scheme.getOutputFields();  }}public interface MultiScheme extends Serializable {  public Iterable> deserialize(byte[] ser);  public Fields getOutputFields();}

其实本身还是调用了传入的scheme方法,只不过返回结果组合成一个list而已,小弟觉得不用也可以。但是storm-kafka里面默认是需要的,在KafkaUtils解析message时调用scheme信息:

public static Iterable> generateTuples(KafkaConfig kafkaConfig, Message msg) {        Iterable> tups;        ByteBuffer payload = msg.payload();        if (payload == null) {            return null;        }        ByteBuffer key = msg.key();        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));        } else {            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));        }        return tups;    }

所以没什么大的需求还是用storm-kafka默认的吧。


例子

kafka收到的message多种多样,而且往下发射的信息页多种多样,所以很多时候我们需要自己写scheme,下面举2个例子


example 1

第一:一般默认发射一个field,但是如果我需要多发几个fields的话,该怎么办呐,现在发射2个,其实网上已有大牛,把kafka的offset加到了发射的信息中去,分析的过程如下:

//returns false if it's reached the end of current batch    public EmitState next(SpoutOutputCollector collector) {        if (_waitingToEmit.isEmpty()) {            fill();        }        while (true) {            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();            if (toEmit == null) {                return EmitState.NO_EMITTED;            }            Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);            if (tups != null) {                for (List tup : tups) {                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));                }                break;            } else {                ack(toEmit.offset);            }        }        if (!_waitingToEmit.isEmpty()) {            return EmitState.EMITTED_MORE_LEFT;        } else {            return EmitState.EMITTED_END;        }    }
从上面看出,发射tuple时已经把offset作为messageId往下发射了,所以我们认为在下面接收tuple的Bolt中可以通过messageId获取offset,但是我们再来看看backtype.storm.daemon.executor 的代码:

(log-message"Opening spout " component-id ":" (keys task-datas))        (doseq[[task-id task-data]task-datas                :let[^ISpout spout-obj (:objecttask-data)                      tasks-fn(:tasks-fntask-data)                      send-spout-msg (fn[out-stream-id values message-id out-task-id]                                       (.increment emitted-count)                                       (let[out-tasks (ifout-task-id                                                         (tasks-fnout-task-id out-stream-id values)                                                         (tasks-fnout-stream-id values))                                             rooted? (andmessage-id has-ackers?)                                             root-id (ifrooted? (MessageId/generateId rand))                                             out-ids (fast-list-for[t out-tasks](ifrooted? (MessageId/generateId rand)))]


从这段代码可以看出,messageId是随机生成的,跟之前kafkaSpout 锚定的new KafkaMessageId(_partition, toEmit.offset)一点关系都没有,所以需要自己手动把offset加到发射的tuple中去,这就需要我们自己实现Scheme了,代码如下:

publicclass KafkaOffsetWrapperScheme implements Scheme {     public static final String SCHEME_OFFSET_KEY = "offset";     private String _offsetTupleKeyName;    private Scheme _localScheme;     public KafkaOffsetWrapperScheme() {        _localScheme = new StringScheme();        _offsetTupleKeyName = SCHEME_OFFSET_KEY;    }      public KafkaOffsetWrapperScheme(Scheme localScheme,                                    String offsetTupleKeyName) {        _localScheme = localScheme;        _offsetTupleKeyName = offsetTupleKeyName;    }     public KafkaOffsetWrapperScheme(Scheme localScheme) {        this(localScheme, SCHEME_OFFSET_KEY);    }     public List deserialize(byte[] bytes) {        return_localScheme.deserialize(bytes);    }     publicFields getOutputFields() {        List outputFields = _localScheme                        .getOutputFields()                        .toList();        outputFields.add(_offsetTupleKeyName);        returnnew Fields(outputFields);    }}


这里的scheme输出是两个fields,一个是str,由StringScheme负责反序列化,或者自己实现其他的scheme;一个是offset,但是offset如何加到发射的tuple中呐??我们从PartitionManager中找到被发射的tuple

public EmitState next(SpoutOutputCollector collector) {    if (_waitingToEmit.isEmpty()) {        fill();    }    while (true) {        MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();        if (toEmit == null) {            return EmitState.NO_EMITTED;        }        Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);        if (tups != null) {            for (List tup : tups) {                tup.add(toEmit.offset);                collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));            }            break;        } else {            ack(toEmit.offset);        }    }    if (!_waitingToEmit.isEmpty()) {        return EmitState.EMITTED_MORE_LEFT;    } else {        return EmitState.EMITTED_END;    }}


KafkaUtils.generateTuples(xxx,xxx)

public static Iterable> generateTuples(KafkaConfig kafkaConfig, Message msg) {        Iterable> tups;        ByteBuffer payload = msg.payload();        if (payload == null) {            return null;        }        ByteBuffer key = msg.key();        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));        } else {            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));        }        return tups;    }


目前我们已经成功把offset加到了发射的tuple中,在bolt中,可以通过tuple.getValue(1),或tuple.getStringByField("offset");来或者

唯一要做的就是在构建SpoutConfig时,指定scheme为KafkaOffsetWrapperScheme

example 2

第二,kafka里面的存的message是其他格式的,如thrift,avro,protobuf格式,那这样就需要自己实现反序列化的过程

这里以avro scheme格式为例(这里就不对avro扫盲了,自己google一下吧)

这时kafka中存放的是avro格式的message,如果avro schema如下

{"namespace": "example.avro", "type": "record", "name": "User", "fields": [     {"name": "name", "type": "string"},     {"name": "favorite_number",  "type": ["int", "null"]},     {"name": "favorite_color", "type": ["string", "null"]} ]}

那我们需要实现Scheme接口

public class AvroMessageScheme implements Scheme{    private final static Logger logger = LoggerFactory.getLogger(AvroMessageScheme.class);    private GenericRecord e2;    private AvroRecord avroRecord;    public AvroMessageScheme() {        }        @Override        public List deserialize(byte[] bytes) {                e2 = null;                avroRecord = null;        try {            InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("examples.avsc");            Schema schema = new Schema.Parser().parse(is);            DatumReader datumReader = new GenericDatumReader(schema);            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);            e2 = datumReader.read(null, decoder);            avroRecord = new AvroRecord(e2);        } catch (Exception e) {            e.printStackTrace();            return new Values(avroRecord);        }        return new Values(avroRecord);    }        @Override        public Fields getOutputFields() {                 return new Fields("msg");          }}


这里往下面发射的是一个POJO类,其实完全可以发射String。这样效率会更高一点。
其AvroRecord POJO如下

public class AvroRecord implements Serializable {    private String name;    private int favorite_number;    private String favorite_color;    public AvroRecord(GenericRecord gr) {        try {            this.name = String.valueOf(gr.get("name"));            this.favorite_number = Integer.parseInt(gr.get("favorite_number"));            this.favorite_color = gr.get("favorite_color").toString();        } catch (Exception e) {            logger.error("read AvroRecord error!");        }    }    @Override    public String toString() {        return "AvroRecord{" +                "name='" + name + '\'' +                ", favorite_number=" + favorite_number +                ", favorite_color='" + favorite_color + '\'' +                '}';    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public String getFavorite_color() {        return favorite_color;    }    public void setFavorite_color(String favorite_color) {        this.favorite_color = favorite_color;    }    public int getFavorite_number() {        return favorite_number;    }    public void setFavorite_number(int favorite_number) {        this.favorite_number = favorite_number;    }}

该例子笔者未经过测试,望慎重使用


Reference

https://blog.deck36.de/no-more-over-counting-making-counters-in-apache-storm-idempotent-using-redis-hyperloglog/

(编辑: wzhg0508)

网友评论
相关文章