Membrane S3 sink

The membrane_s3_plugin(as of version 0.1.2) has gotten outdated with changes in membrane_core. Here's an S3 sink implementation that works with the latest Membrane:

defmodule Membrane.S3.Sink do
  @moduledoc """
  Element that uploads incoming buffers to an S3 bucket.

  Pipeline logs are directed to standard output by default. To separate them from the sink's output
  we recommend redirecting the logger to standard error. For simple use cases using the default logger
  configuration (like stand-alone scripts) this can be achieved by simply calling redirect_logs_to_stderr/0.
  """
  use Membrane.Sink

  alias ExAws.S3

  # 5MB
  @min_chunk_size 5 * 1024 * 1024

  def_options(
    bucket: [
      spec: String.t(),
      description: "Name of the S3 bucket"
    ],
    object: [
      spec: String.t(),
      description: "Object key in the S3 bucket"
    ]
  )

  def_input_pad(:input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any)

  @spec redirect_logs_to_stderr() :: :ok
  def redirect_logs_to_stderr() do
    :ok = :logger.remove_handler(:default)
    LoggerBackends.add(LoggerBackends.Console)
    LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
  end

  @impl true
  def handle_init(_ctx, %__MODULE__{bucket: bucket, object: object}) do
    {:ok, upload_id} = initiate_multipart_upload(bucket, object)

    {[],
     %{
       bucket: bucket,
       object: object,
       upload_id: upload_id,
       part_number: 1,
       buffer: <<>>,
       parts: []
     }}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {[demand: :input], state}
  end

  # @impl true
  # def handle_start_of_stream(:input, _ctx, state) do
  #   init_op = ExAws.S3.initiate_multipart_upload(state.bucket, state.object)
  #   %{status_code: 200, body: %{upload_id: upload_id}} = ExAws.request!(init_op)
  #   {[], %{state | upload_id: upload_id, part_number: 1}}
  # end

  @impl true
  def handle_buffer(:input, buffer, _ctx, %{buffer: existing_buffer} = state) do
    new_buffer = existing_buffer <> buffer.payload

    if byte_size(new_buffer) >= @min_chunk_size do
      {to_upload, remaining} = split_buffer(new_buffer)
      part = upload_part(to_upload, state)

      {[demand: :input],
       %{
         state
         | buffer: remaining,
           part_number: state.part_number + 1,
           parts: [part | state.parts]
       }}
    else
      {[demand: :input], %{state | buffer: new_buffer}}
    end
  end

  @impl true
  def handle_end_of_stream(:input, _ctx, state) do
    finalize_upload(state)
    {[], state}
  end

  # @impl true
  # def handle_terminate_request(_ctx, state) do
  #   IO.inspect("handle_terminate_request #{inspect(state)}", label: "handle_terminate_request")
  #   finalize_upload(state)
  #   {[terminate: :normal], state}
  # end

  defp initiate_multipart_upload(bucket, object) do
    %{status_code: 200, body: %{upload_id: upload_id}} =
      S3.initiate_multipart_upload(bucket, object)
      |> ExAws.request!()

    {:ok, upload_id}
  end

  defp upload_part(part, %{
         bucket: bucket,
         object: object,
         upload_id: upload_id,
         part_number: part_number
       }) do
    %{status_code: 200, headers: headers} =
      S3.upload_part(bucket, object, upload_id, part_number, part)
      |> ExAws.request!()

    {:ok, etag} = find_etag(headers)

    {part_number, etag}
  end

  defp find_etag(headers) do
    headers
    |> Enum.find_value(
      {:error, :invalid_etag},
      fn {key, value} ->
        if String.equivalent?(String.downcase(key), "etag") do
          {:ok, String.trim(value, "\"")}
        end
      end
    )
  end

  defp finalize_upload(state) do
    state =
      if byte_size(state.buffer) > 0 do
        part = upload_part(state.buffer, state)
        %{state | part_number: state.part_number + 1, buffer: <<>>, parts: [part | state.parts]}
      else
        state
      end

    finalize_multipart_upload(state)
    state
  end

  defp finalize_multipart_upload(%{
         bucket: bucket,
         object: object,
         upload_id: upload_id,
         parts: parts
       }) do
    S3.complete_multipart_upload(bucket, object, upload_id, Enum.reverse(parts))
    |> ExAws.request!()
  end

  defp split_buffer(buffer) when byte_size(buffer) < @min_chunk_size do
    {buffer, <<>>}
  end

  defp split_buffer(buffer) do
    <<to_upload::binary-size(@min_chunk_size), remaining::binary>> = buffer
    {to_upload, remaining}
  end
end