sparkStreaming写入Phoenix代码以及部署过程中遇到的问题以及解决方案

写入代码

1
2
3
4
5
6
rdd.flatMap(x => x).map(json => {
meetingSignalSchema(JsonNull.handleNull("rowkey", json).toString, JsonNull.handleNull("meeting_id", json).toString, JsonNull.handleNull("event", json).toString, JsonNull.handleNull("enterprise_id", json).toString, JsonNull.handleNull("department_id", json).toString, JsonNull.handleNull("department_name", json).toString, JsonNull.handleNull("enterprise_name", json).toString, JsonNull.handleNull("service_type", json).toString, JsonNull.handleNull("scheduled_id", json).toString, JsonNull.handleNull("scheduled_title", json).toString, JsonNull.handleNull("meeting_type", json).toString, JsonNull.handleNull("profile_type", json).toString, JsonNull.handleNull("charge_uri", json).toString, JsonNull.handleNull("virtual_number", json).toString, JsonNull.handleNull("create_time", json).toString, JsonNull.handleNull("charge_entity", json).toString, JsonNull.handleNull("first_caller", json).toString, JsonNull.handleNull("first_callee", json).toString, JsonNull.handleNull("time_begin", json).toString, JsonNull.handleNull("time_end", json).toString, JsonNull.handleNull("duration", json).toString, JsonNull.handleNull("display_name", json).toString, JsonNull.handleNull("call_number", json).toString, JsonNull.handleNull("charge_user_count", json).toString, JsonNull.handleNull("user_count_peak", json).toString, JsonNull.handleNull("session_free", json).toString, JsonNull.handleNull("session_free_count", json).toString, JsonNull.handleNull("session_max_count", json).toString, JsonNull.handleNull("sum_user_count", json).toString, "1", JsonNull.handleNull("meet_level", json).toString, JsonNull.handleNull("group_id", json).toString, JsonNull.handleNull("domain_id", json).toString, JsonNull.handleNull("visible_department", json).toString, JsonNull.handleNull("mp_cloud_conf_no", json).toString, JsonNull.handleNull("zone_num", json).toString)
})
.saveToPhoenix((environment + confTable).toUpperCase,
Seq("id", "meeting_id", "event", "enterprise_id", "department_id", "department_name", "enterprise_name", "service_type","scheduled_id", "scheduled_title", "meeting_type", "profile_type", "charge_uri", "virtual_number","create_time", "charge_entity", "first_caller", "first_callee", "time_begin", "time_end", "duration","display_name", "call_number", "charge_user_count", "user_count_peak", "session_free", "session_free_count","session_max_count", "sum_user_count", "suport_call_quality_statis", "meet_level", "group_id", "domain_id", "visible_department", "mp_cloud_conf_no", "zone_num"),
zkUrl = Some(hbaseAddress+":2181"))

可以直接通过dataframe写Phoenix,但是遇到Phoenix列名是小写,但是saveToPhoenix会将dataframe的列名映射为大写导致列名不匹配的异常


阅读全文 »