[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: about PCollection process


If I understood correctly you read from a file some parameters that you are going to use to prepare an HBase Scan. If this is the case you cannot do this with the current HBaseIO API, but there is ongoing work to support this transparently with the new SDF API. If you want to track the progress this is the JIRA
Hopefully it will be ready in the following days/weeks.

In the meantime you can do a workaround by applying a ParDo after you extract the scan parameters from the files and then do a DoFn to request the data, something similar to what SDF is doing, for ref:

Hope this helps,

On Thu, Jul 5, 2018 at 4:53 AM Frank Li <surpass_li@xxxxxxxxxx> wrote:
        I'm running a Beam pipeline which uses the TextIO read same text from text file, PTransform  peer line search hbase.  result is   PCollection<PCollection<KV<String, RecordData>>>

public PCollection<PCollection<KV<String, RecordData>>> expand(PCollection<String> lines) {

PCollection<PCollection<KV<String, RecordData>>> results = lines
.apply(ParDo.of(new DoFn<String, PCollection<KV<String, RecordData>>>() {
public void processElement(ProcessContext c) {
String vin = c.element();

Pipeline pipelineHbase = Pipeline.create(c.getPipelineOptions());

HBaseIO.Read read =
Bytes.toBytes(String.format("%s-%s", vin, startTime)),
Bytes.toBytes(String.format("%s-%s", vin, endTime)));
PCollection<Result> results = pipelineHbase.apply(read);

PCollection<KV<String, RecordData>> recordResults = results
.apply(ParDo.of(new Result2RecordNoModifyDataFn()));



return results;

what process PCollection<PCollection<KV<String, RecordData>>> ????