Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3: Remove duplicated region argument in client methods #297

Closed
zaharidichev opened this issue May 10, 2017 · 21 comments
Closed

S3: Remove duplicated region argument in client methods #297

zaharidichev opened this issue May 10, 2017 · 21 comments
Milestone

Comments

@zaharidichev
Copy link
Contributor

Currently we have:

final class S3Settings(val bufferType: BufferType,
                       val diskBufferPath: String,
                       val proxy: Option[Proxy],
                       val awsCredentials: AWSCredentials,
                       val s3Region: String,
                       val pathStyleAccess: Boolean)

But then we have methods such as:

  private[this] def s3Request(s3Location: S3Location,
                              region: String,
                              method: HttpMethod = HttpMethods.GET,
                              uriFn: (Uri => Uri) = identity)(implicit conf: S3Settings): HttpRequest

Whats the point of passing a region arg if we already have it configured into the implicit configuration. This is confusing. IMO, we need to omit the region argument and keep the one in S3Settings and use it.

@zaharidichev
Copy link
Contributor Author

As a matter of fact I think it will be better to proceed with this before addressing #271

zaharidichev added a commit to zaharidichev/alpakka that referenced this issue May 11, 2017
@johanandren
Copy link
Member

Would be good to have one of the previous s3 committers comment on this as it will break the code of pretty much everyone who used the current s3 client API.

Ping @thereisnospoon, @kobmic, @Scalahansolo, @svezfaz, @2Beaucoup

@svezfaz
Copy link
Contributor

svezfaz commented May 11, 2017

I think cleaning up the API now might make sense. It seems like there is more than one wrong thing with it:

object S3Stream {

  def apply(credentials: AWSCredentials, region: String)(implicit system: ActorSystem, mat: Materializer): S3Stream =
    new S3Stream(credentials, region, S3Settings(system))
}

private[alpakka] final class S3Stream(credentials: AWSCredentials,
                                      region: String,
                                      val settings: S3Settings)(implicit system: ActorSystem, mat: Materializer) {
...
}
  1. neither system nor materializer are actually needed to create a S3Stream. Only thing needed is an executionContext, and that could as well be passed to every method separately, instead of passing it at creation
  2. as per @zaharidichev comment, we only really need the S3Settings object to create an S3Stream, having this hybrid approach makes for a confusing API
  3. S3Settings.apply taking an ActorSystem does not seem good practice to me. If we want a default way to create S3Settings we could just have an empty apply() that performs ConfigFactory.load() for us

Moreover, issues 1 and 2 gets propagated to S3Client as well.

+1 to sort this up now :)

@zaharidichev
Copy link
Contributor Author

@svezfaz Should we sort that stuff in this PR or create a new one ? Also, if anyone else can chip in with any comments, it will be greatly appreciated :)

@svezfaz
Copy link
Contributor

svezfaz commented May 11, 2017

my take is that it won't be lots of work, and at least we would break the API with a single pass, instead of fragmenting this. But it's up to you of course.

@zaharidichev
Copy link
Contributor Author

Alright, will finish it later tonight and push then :)

@zaharidichev
Copy link
Contributor Author

@svezfaz While we are on these pages, what do you think about this piece of code here:

private[alpakka] final class S3Stream(credentials: AWSCredentials,
                                      val settings: S3Settings)

S3Settings already contains AWSCredentials so again this does not seem to be very clean. I think its better to remove the ability to pass AWSCredentials. Everything that is configuration should be coming from S3Settings.

@kobmic
Copy link
Contributor

kobmic commented May 11, 2017

@zaharidichev I like the possibility to pass AWSCredentials in S3Client (which then calls S3Stream). We always use IAM roles and never store AWS credentials in any config files. If I understand your proposal correctly, we would have to keep credentials in the config file?

@svezfaz
Copy link
Contributor

svezfaz commented May 11, 2017

@kobmic agreed, the contract for S3Client should probably stay. It makes sense for simpler usages to just bypass having specific config files. It will just need to be changed internally in how it creates the underlying S3Stream, e.g. it could load the reference config and override only credentials and region.

@zaharidichev
Copy link
Contributor Author

@kobmic The more I think about it, it is clear that you are quite right. @svezfaz So what do you think is the best course of action in that case. Do we leave it like it is ? I did not quite get what you mean by "override". By the looks of it, we now are obliged to pass e separate credentials object on construction no matter what.

@svezfaz
Copy link
Contributor

svezfaz commented May 11, 2017

What I meant is, we could have S3Stream that can be only created passing a S3Settings, and keep S3Client simpler API requiring only credentials and region.
S3Client internals could look something like

final class S3Client(credentials: AWSCredentials, region: String) {

  val settings = S3Settings.apply(ConfigFactory.load())
    .copy(awsCredentials = credentials, s3Region = region)
  
  private[this] val impl = S3Stream(settings)
  ...
}

(this would require making S3Settings a case class.. which might be a good call anyway ;) )

@zaharidichev
Copy link
Contributor Author

That makes sense. However what happens in the situation that is the opposite of the one that @kobmic describes. Imagine a client of the API actually does keep all the settings (including credentials and region) in the configuration file. Then he is forced to provide these when constructing the S3Client even though they will be loaded from the config file. Does not seem very intuitive to me.

object S3Client {
  val MinChunkSize = 5242880

  def apply(): S3Client = new S3Client(S3Settings.apply(ConfigFactory.load()))
  
  def apply(credentials: AWSCredentials, region: String): S3Client = {
    val settings = S3Settings.apply(ConfigFactory.load())
      .copy(awsCredentials = credentials, s3Region = region)
    new S3Client(s3Settings)
  }
}

final class S3Client(s3Settings: S3Settings) {
.......

This leaves it to the client of the API to choose whether to rely completely on the config file or override region and credentials.

@svezfaz
Copy link
Contributor

svezfaz commented May 11, 2017

@zaharidichev yes, that looks like what we need

@kobmic
Copy link
Contributor

kobmic commented May 12, 2017

@zaharidichev looks good to me

@zaharidichev
Copy link
Contributor Author

@svezfaz

neither system nor materializer are actually needed to create a S3Stream. Only thing needed is an executionContext, and that could as well be passed to every method separately, instead of passing it at creation

The method below however needs both the Actor System and the materializer, You guys thinks that its better to pass all that is needed implicitly on every call, where needed, instead of provideng them on construction ?

  private def signAndGet(request: HttpRequest): Future[HttpResponse] =
    for (req <- Signer.signedRequest(request, signingKey);
         res <- Http().singleRequest(req)) yield res

@svezfaz
Copy link
Contributor

svezfaz commented May 12, 2017

@zaharidichev you're right - I missed the fact that it's using Http().singleRequest in a few places. This indeed requires a materializer. I would pass it in at creation - we can keep the mat.executionContext calls wherever needed.

Where do you see an ActorSystem is needed?

@zaharidichev
Copy link
Contributor Author

zaharidichev commented May 12, 2017

Hi there. Its needed in calls such as Http().singleRequest(req). Because...

object Http {
  def apply()(implicit system: ActorSystem): HttpExt = super.apply(system)
.....
}

While Materializer is need in Signer.signedRequest

@zaharidichev
Copy link
Contributor Author

So we really have two options actually. One is to pass things at construction time, the way it is now and the other is to pas whatever is needed as implicits. I think we need to choose one of them and to stick to it.

@svezfaz
Copy link
Contributor

svezfaz commented May 12, 2017

completely missed that as well. sorry :/
I would just pass them at construction

@zaharidichev
Copy link
Contributor Author

zaharidichev commented May 12, 2017

Think we should be all good now
#298

zaharidichev added a commit to zaharidichev/alpakka that referenced this issue May 14, 2017
@zaharidichev
Copy link
Contributor Author

@svezfaz does that look good as it is ?

@johanandren johanandren added this to the 0.9 milestone May 18, 2017
s-soroosh added a commit to s-soroosh/alpakka that referenced this issue Jun 12, 2017
- Set default settings as default parameter
- Improve tests

Implement java dsl

[FTP] Critical fix for infinite loop of traversing "." and ".." directories

Upgrade to aws-java-sdk-dynamodb 1.11.106

Allow to pass a `SSLSocketFactory` to `MqttConnectionSettings`

FTP - attribute enrichment of FTPFile akka#153

Add KairosDB connector

= akka#135 Limit parallelism for the SqsSource (akka#163)

* Create a custom thread pool for the SqsSource and limit concurrency with the buffer size

* Provide AWSCredentials for all SqsSource tests

* Provide better java api

* Use ArrayDeque as FIFO queue
Messages for asserts in SqsSourceSettings

* Remove AmazonSQSAsyncClient from factory method

- Correct the javadoc

* Add implicit AmazonSQSAsync to factory method

- It now conforms again with the SqsSink
- Passing the clients from extern seems the preferred way (awslambda module)

* Update documentation to reflect thread pool usage

* Improvements from review by ktoso

- Replace IntStream with Source.range
- Use Sink.head instead of Sink.seq

=pro update akka http to 10.0.5 (akka#230)

Make Travis fail build on code format differences

=sqs fix typo in require in SqsSourceSettings (akka#228)

FTP - toPath sink akka#182

Add GCE Pubsub with publish and subscribe.

improve naming consistency of private vars.

PR feedback. Improve java api, java examples, better json marshalling.

use mapAsyncUnordered

s3: provide access to the returned response

s3: clean up test log

s3: add javadsl for request()

Time goes by, next try

Update connectors.md (akka#237)

Make external libs more visible (reactive kafka) (akka#229)

* Update TOC depth to 3, to show Reactive Kafka

Now people don't notice Kafka is in here since it's "external", expanding the TOC one more level makes it more visible.

WDYT?

* Update index.md

FTP: make lastModified test more robust (fixes akka#236)

Add SqsAckSink (akka#129)

Add SqsAckSink

* update elasticmq version
* update dependencies

Added possibility configure Sftp connection using private key akka#197.

- Added SftpIdentity case class to allow configuring private/public key,
- Added option to configure known_hosts file and test to check its usage.
- Added spec that should fail password based authentication and revert to private key one,
- Added docs paragraph to describe this option.

Upgrade to scalafmt 0.6.6

Remove deperecated binPack.callSite scalafmt setting

Format with updated scalafmt and fixed settings

S3 - add documentation akka#103

Fix alphabetical ordering in the docs

Separate out release docs

S3 path style access akka#64

Make SqsSourceTest less likely to fail

- Reduce amount of sent messages to 1 (multiple batch streaming is tested in the SqsSourceSpec)
- Increase timeout

Introduced "secure" boolean property for S3 which controls whether HTTPS is used akka#247

README: add scaladex, travis badges

And make docs links less scary to click on :)

Add CSV data transformation module (akka#213)

* Alpakka Issue akka#66: CSV component

* Alpakka Issue akka#66: revised CSV parser

* Alpakka Issue akka#60: CSV parsing stage

* wait for line end before issuing line

As the byte string may not contain a whole line the parser needs to read until a line end is reached.

* Add Java API and JUnit test; add a bit of documentation

* Introduce CsvToMap stage; more documentation

* Parse line even without line end at upstream finish

* Add Java API for CsvToMap; more documentation

* More restricted API, incorporated comments by @johanandren

* Format sequence as CSV in ByteString

* Add Scala CSV formatting stage

* Add Java API for CSV formatting; more docs

* Separate enums for Java and Scala DSLs

* Use Flow.fromGraph to construct flow

* Rename CsvFraming to CsvParsing

* Check for Byte Order Mark and ignore it for UTF-8

* Emit Byte Order Marks in formatting; CsvFormatting is just a map

* Byte Order Mark for Java API

* Add line number to error messages; sample files exported from third party software

* Use Charset directly instead of name

* csv: autoformatted files

* simplified dependency declaration

Fixes akka#60.

SQS flows + Embedded ElasticMQ akka#255

* Add a flow stage and use ElasticMQ
* Use flow-based stage for ACKs
* Use AmazonSQSAsync instead of AmazonSQSAsyncClient
* Using embedded ElasticMQ for tests

add SNS connector with publish sink akka#204

Await futures before performing assertion (fixes akka#235)

When an assertion fails after the test has already succeeded it will be
ignored, so Await the future before continuing with the check.

Document 'docker-compose' for running tests

Fail MqttSourceStage mat. value on connection loss

And increase the timeout. Might help with akka#189, or otherwise help generate a
better error message when it does happen again.

Ref akka#2 add IronMq integration

Refs akka#2 add at-least-one semantic to IronMq connector

Improve documentation and test coverage for IronMq ref akka#2

- Document the IronMq domain classes
- Document IronMq client
- Test the at-least-once producer/consumer mechanism
- Improve the IronMQ connector documentation

Ref akka#2 Preserve newline in reference.conf

Ref akka#2 Make seure the actor system is fully terminated after each test

Ref akka#2 Reformat code

Refs akka#2 define a different Committable and CommittableMessage for Java and Scala DSL

Refs akka#2 Fix typos in IronMQ documentations

Refs akka#2 Remove non needed Environment variables from TravisCI config file

Refs akka#2 Add a simple Java test and refactor the Java DSL to looks better in Java

FTP: Attempt to fix flaky test on Travis

Link to scaladex (akka#266)

s3: support for encryption, storage class, custom headers akka#109

s3: Added support for partial file download from S3 akka#264 (akka#265)

Add version info and links in index page (akka#273)

FTP - append mode for toPath sink + improved upstream failure handling akka#207

Fix broken recovery of EventSource (sse)

Replace scala.binaryVersion with scalaBinaryVersion (see akka#278)

Fix minor typo in alpakka MQTT Connector doc

Add Flow to support RabbitMQ RPC workflow akka#160

Changes Amqp sinks to materialize to Future[Done]. As currently it was
very difficult to determine when/if a sink failed due to a amqp error.

AMQP: add more options to configuration of the ConnectionFactory, akka#191

Directory sources akka#272

sse: Upgrade to Akka SSE 3 and make test more robust

CSV: Fixes ignored second double quote

S3: add listBucket method to S3 library (akka#253)

* Added recursive listBucket call to get all keys under a specific prefix.

* Properly using the request URI method and constructing queries with the Query type

* Added tests around query parsing

* Fixed formatting and removed recoverWithRetries on listbucket calls as they are already retried on the underlying layer with max-retries

* Using signAndGetAs instead of signAndGet as to not duplicate logic.

* Implemented quick fixes based on comments. Removed recursive call to get keys and used unfoldAsync to get all keys to run in constant memory.

* Added execution context. Fixed broken test

* Fixed formatting error.

* Cleaned up lisBucket call by added a ListBucketState object instead of the brutal type signature from earlier.

* Moved trait for listBucket into the def itself as to remove it from the public namespace.

azure-storage-queue connector akka#280

Add attribute parameters to sqs source settings akka#302

Formatting fix for akka#302

Streaming XML parser and utilities.

Prepare XML parser to join Alpakka family

Remove duplicated region argument in client methods akka#297

Build with Akka 2.5 as well

Add Azure Storage Queue documentation to TOC

Stub documentation for S3.listBucket

S3: fix formatting

Run the deployment only against Akka 2.4

PubSub: Add support for emulator host variables

Initial commit for apache geode connector

CSV: Emit all lines on completion akka#315

XML: make code in tests more consistent

Add whitesource plugin

Merge branch 'master' into add-kairosdb-connector

Add copyright header

update docker-compose

Make execution context optional in java api

Make execution context optional in scala api

remove ec from sink spec
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants