Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support FileInputInputStream with multiple InputStreams #37

Conversation

chikamura
Copy link
Contributor

Currently when FileInputInputStream has multiple InputStreams, only the first InputStream is read. For example, this test fails.
This is because the FileInputInputStream is closed when calling ParseContext.parse(InputStream json), and next FileInputInputStream.nextFile() is false.

I added a fix that does not close, so could you please check this pr?

@hiroyuki-sato
Copy link
Owner

hiroyuki-sato commented Aug 23, 2023

Hello, @chikamura. Thank you for creating this PR.

I have some concerns.

  1. Does this PR parses multiple JSON objects in the same file like the one below, correct? (This was my misunderstanding)
  2. This PR changes current behavior, so I think it is better to introduce it as optional at first. (ex. multiple_json: true) (This was my misunderstanding)
  3. IIUC, ByteArrayOutputStream store whole data in the file into the memory, so If a user uses a 100MB JSON object it allocate 100MB memory as an input buffer, correct? If so, I think it is not a good implementation.
  4. Finally, Does his PR work as you expected? the following code outputs just one record. I thought it outputs two entries. (This was my misunderstanding)

This PR description.

cat test.json

[{"a":1, "b":2 }]
[{"a":2, "b":3 }]
  • Expect behavior: parse two entries. (This was my misunderstanding)
  • Actual(Current) behavior: parse first part. {"a":1, "b":2 } only. (This was my misunderstanding)

Test result.

Sample json.

test.json file.

[{"a":1, "b":2 }]
[{"a":2, "b":3 }]
in:
  type: file
  path_prefix: test
  parser:
    type: jsonpath
#    root: "$.results"
    default_timezone: "Asia/Tokyo"
    columns:
      - { name: "a", type: long }
      - { name: "b", type: long }
out:
  type: stdout

Expect Outputs.

1,2
2,3

Actual Outputs

1,2

@@ -158,8 +160,14 @@ public void run(TaskSource taskSource, Schema schema,
FileInputInputStream is = new FileInputInputStream(input);
while (is.nextFile()) {
final JsonNode json;
try {
json = JsonPath.using(JSON_PATH_CONFIG).parse(is).read(jsonRoot, JsonNode.class);
try (ByteArrayOutputStream bout = new ByteArrayOutputStream()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW:

How about the following solution (if you do not want to exec close() by parse(is))?
Wrapping the original is into a temporary InputStream will avoid allocating(& copying) a new buffer to parse json.

final InputStream toParse = new InputStream() {
    @Override
    public int read() {
        return is.read();
    }
};

try  {
    json = JsonPath.using(JSON_PATH_CONFIG).parse(toParse).read(jsonRoot, JsonNode.class);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@takumakanari
Many thanks! I think that is the better solution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please take care of closing is after the operations 👍

@chikamura
Copy link
Contributor Author

chikamura commented Aug 23, 2023

@hiroyuki-sato
Thank you for your review!

Does this PR parses multiple JSON objects in the same file like the one below, correct?

No, it isn't, each file has one json object. The problem will occur with specific implementations of a file input plugin.
A file input plugin implements TransactionalFileInput open(TaskSource taskSource, int taskIndex), it returns TransactionalFileInput.
Most plugin's TransactionalFileInput has only one file (input stream), but the embulk specifications also supports multiple files (input streams) .
In the latter case, only the first file(input stream) is read in the current implementation.

IIUC, ByteArrayOutputStream store whole data in the file into the memory, so If a user uses a 100MB JSON object it allocate 100MB memory as an input buffer, correct? If so, I think it is not a good implementation.

I will reconsider the implementation.
ParseContext.parse(InputStream json) always closes an input stream, so I use ParseContext.parse(String json), but it would be fine by wrapping an input stream.

@hiroyuki-sato
Copy link
Owner

@chikamura OK. I understood the problem. Is it possible to provide reproduce code? What InputPlugin uses a multi-input stream? It helps to understand the problem.

@takumakanari Thank you for your comment!

@chikamura
Copy link
Contributor Author

chikamura commented Aug 23, 2023

@hiroyuki-sato

Is it possible to provide reproduce code? What InputPlugin uses a multi-input stream?

Could you please check this repository? https://github.com/trocco-io/embulk-input-http/tree/feature/cursor
This is a forked repository of the embulk-input-http plugin to support requests with cursol.
(I haven't found any other public repository uses multi-input streams.)

config.yml

in:
  type: http
  url: http://express.heartrails.com/api/json
  params:
    - {name: method, value: getStations}
    - {name: x, value: 135.0}
    - {name: y, value: "35"}
  cursor:
    request_parameter_cursor_name: name
    response_parameter_cursor_json_path: '$.response.station[0].next'
  parser:
    type: jsonpath
    root: '$.response.station'
    columns:
      - name: name
        type: string

out: {type: stdout}

expected result

日本へそ公園
比延
黒田庄
黒田庄
本黒田
船町口
久下村
谷川
谷川

actual result in current master branch (only first http request)

日本へそ公園
比延
黒田庄

@hiroyuki-sato
Copy link
Owner

@chikamura Thanks. I could reproduce the problem in my environment.

@chikamura chikamura requested a review from takumakanari August 23, 2023 09:18
Copy link
Collaborator

@takumakanari takumakanari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented 🖊️

@@ -152,14 +153,21 @@ public void run(TaskSource taskSource, Schema schema,
final boolean stopOnInvalidRecord = task.getStopOnInvalidRecord();

// TODO: Use Exec.getPageBuilder after dropping v0.9
try (final PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output)) {
try (final PageBuilder pageBuilder = new PageBuilder(Exec.getBufferAllocator(), schema, output);
final FileInputInputStream is = new FileInputInputStream(input)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion:

why don't you try the following?

try (final FileInputInputStream is = new FileInputInputStream(input)) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, the indentation was wrong.
Declaration of FileInputInputStream was seen outside try ().
I fixed the indentation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant it's better to surely close FileInputStream, so I suggested that you declare is in try block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like pageBuilder and is are both in try block. (One try block can include multple resources.)
Sorry if I misunderstood.

Copy link
Contributor Author

@chikamura chikamura Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, FileInputInputStream try block should include only while (is.nextFile()) block?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so too 🙆‍♂️

while (is.nextFile()) {
// parse(InputStream json) cause is.close(), so wrapping the original is into a temporary InputStream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@chikamura chikamura requested a review from takumakanari August 23, 2023 10:04
@hiroyuki-sato
Copy link
Owner

@chikamura @takumakanari Thank you for changing and reviewing the codes.
Now, Ready to merge? (LGTM)

The below is just my memo.

embulk-input-https fetch from data source multiple times.

The results are separated by multiple HTTP responses. Those results are independent JSON objects like multiple files.
But, It constructs a single FileInputInputStream which contains multiple InputStream.

2023-08-24 09:29:13.973 +0900 [INFO] (0015:task-0000): GET "http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35"
2023-08-24 09:29:15.686 +0900 [INFO] (0015:task-0000): GET "http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35&name=%E9%BB%92%E7%94%B0%E5%BA%84"
2023-08-24 09:29:15.754 +0900 [INFO] (0015:task-0000): GET "http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35&name=%E6%9C%AC%E9%BB%92%E7%94%B0"
2023-08-24 09:29:15.799 +0900 [INFO] (0015:task-0000): GET "http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35&name=%E8%88%B9%E7%94%BA%E5%8F%A3"
2023-08-24 09:29:15.840 +0900 [INFO] (0015:task-0000): GET "http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35&name=%E4%B9%85%E4%B8%8B%E6%9D%91"
2023-08-24 09:29:15.986 +0900 [INFO] (0015:task-0000): GET "http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35&name=%E8%B0%B7%E5%B7%9D"
% curl -Lv 'http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35'
*   Trying 35.75.165.181:80...
* Connected to express.heartrails.com (35.75.165.181) port 80 (#0)
> GET /api/json?method=getStations&x=135.0&y=35 HTTP/1.1
> Host: express.heartrails.com
> User-Agent: curl/7.88.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 24 Aug 2023 00:38:05 GMT
< Content-Type: application/json; charset=utf-8
< Content-Length: 572
< Connection: keep-alive
< Server: nginx
< Expires: Thu, 01 Dec 1994 16:00:00 GMT
< Pragma: no-cache
< X-Runtime: 1
< ETag: "952bd603b2f475e0e56ae31927adb679"
< Cache-Control: private, max-age=0, must-revalidate
< Access-Control-Allow-Origin: *
< Access-Control-Allow-Methods: GET, OPTIONS
< Access-Control-Allow-Headers: *
<
{"response":{"station":[{"name":"日本へそ公園","prefecture":"兵庫県","line":"JR加古川線","x":134.997633,"y":35.002069,"postal":"6770039","distance":"320m","prev":"比延","next":"黒田庄"},{"name":"比延","prefecture":"兵庫県","line":"JR加古川線","x":134.995733,"y":34.988773,"postal":"6770033","distance":"1310m","prev":"新西脇","next":"日本へそ公園"},{"name":"黒田庄","prefecture":"兵庫県","line":"JR加古川線","x":134.992522,"y":35.022689,"postal":"6790313","distance":"2620m","prev":"日本へそ公園","next":"本黒田"}]}}
* Connection #0 to host express.heartrails.com left intact
{
  "response": {
    "station": [
      {
        "name": "日本へそ公園",
        "prefecture": "兵庫県",
        "line": "JR加古川線",
        "x": 134.997633,
        "y": 35.002069,
        "postal": "6770039",
        "distance": "320m",
        "prev": "比延",
        "next": "黒田庄"
      },
      {
        "name": "比延",
        "prefecture": "兵庫県",
        "line": "JR加古川線",
        "x": 134.995733,
        "y": 34.988773,
        "postal": "6770033",
        "distance": "1310m",
        "prev": "新西脇",
        "next": "日本へそ公園"
      },
      {
        "name": "黒田庄",
        "prefecture": "兵庫県",
        "line": "JR加古川線",
        "x": 134.992522,
        "y": 35.022689,
        "postal": "6790313",
        "distance": "2620m",
        "prev": "日本へそ公園",
        "next": "本黒田"
      }
    ]
  }
}
% curl -Lv 'http://express.heartrails.com/api/json?method=getStations&x=135.0&y=35&name=%E9%BB%92%E7%94%B0%E5%BA%84'
*   Trying 35.75.165.181:80...
* Connected to express.heartrails.com (35.75.165.181) port 80 (#0)
> GET /api/json?method=getStations&x=135.0&y=35&name=%E9%BB%92%E7%94%B0%E5%BA%84 HTTP/1.1
> Host: express.heartrails.com
> User-Agent: curl/7.88.1
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 24 Aug 2023 00:39:18 GMT
< Content-Type: application/json; charset=utf-8
< Content-Length: 192
< Connection: keep-alive
< Server: nginx
< Expires: Thu, 01 Dec 1994 16:00:00 GMT
< Pragma: no-cache
< X-Runtime: 1
< ETag: "3be1f77accfba140aa48670d77eb6e97"
< Cache-Control: private, max-age=0, must-revalidate
< Access-Control-Allow-Origin: *
< Access-Control-Allow-Methods: GET, OPTIONS
< Access-Control-Allow-Headers: *
<
{"response":{"station":[{"name":"黒田庄","prefecture":"兵庫県","line":"JR加古川線","x":134.992522,"y":35.022689,"postal":"6790313","prev":"日本へそ公園","next":"本黒田"}]}}
* Connection #0 to host express.heartrails.com left intact
{
  "response": {
    "station": [
      {
        "name": "黒田庄",
        "prefecture": "兵庫県",
        "line": "JR加古川線",
        "x": 134.992522,
        "y": 35.022689,
        "postal": "6790313",
        "prev": "日本へそ公園",
        "next": "本黒田"
      }
    ]
  }
}

....

@chikamura
Copy link
Contributor Author

@hiroyuki-sato
Thank you for your review. It's ok with me.

Copy link
Collaborator

@takumakanari takumakanari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@takumakanari
Copy link
Collaborator

@hiroyuki-sato approved. sorry for the late 🙏

@hiroyuki-sato hiroyuki-sato merged commit 8e87d4d into hiroyuki-sato:master Aug 24, 2023
@hiroyuki-sato
Copy link
Owner

Thanks!

@chikamura
Copy link
Contributor Author

Many thanks! 🙏

@hiroyuki-sato
Copy link
Owner

@chikamura I'll release the new version in a week If you don't plan to create a new PR.

@chikamura
Copy link
Contributor Author

@hiroyuki-sato
I have no plans to create a new PR.
I'm looking forward to the new version releases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants