Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ tarballs/
.stack-work/
*~
*#

# No nix & direnv setup
flake.nix
flake.lock
.envrc
.direnv
4 changes: 4 additions & 0 deletions conduit-extra/ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# ChangeLog for conduit-extra

## 1.3.8

* Gracefully handle when a subprocess started using `Data.Conduit.Process.sourceProcessWithStreams` closes its stdin. Fixes [#523](https://github.com/snoyberg/conduit/issues/523)

## 1.3.7

* Allow Data.Conduit.Network.Unix on Windows [#518](https://github.com/snoyberg/conduit/pull/518)
Expand Down
13 changes: 11 additions & 2 deletions conduit-extra/Data/Conduit/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHan
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket)
import Control.Exception (onException, throwIO, finally, bracket, catch)
import System.IO.Error (ioeGetErrorType, isResourceVanishedErrorType)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
Expand Down Expand Up @@ -143,16 +144,24 @@ sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr =
, (sourceStdout, closeStdout)
, (sourceStderr, closeStderr)
, sph) <- streamingProcess cp
let safeSinkStdin = sinkStdin `catchC` ignoreStdinClosed
safeCloseStdin = closeStdin `catch` ignoreStdinClosed
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
<$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| safeSinkStdin) `finally` safeCloseStdin)
<*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
<*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
`finally` (closeStdout >> closeStderr)
`onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
return (ec, resStdout, resStderr)
where
ignoreStdinClosed :: forall m. (MonadIO m) => IOError -> m ()
ignoreStdinClosed e =
if isResourceVanishedErrorType (ioeGetErrorType e)
then pure ()
else liftIO (throwIO e)

-- | Like @sourceProcessWithStreams@ but providing the command to be run as
-- a @String@.
Expand Down
2 changes: 1 addition & 1 deletion conduit-extra/conduit-extra.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Cabal-version: >=1.10
Name: conduit-extra
Version: 1.3.7
Version: 1.3.8
Synopsis: Batteries included conduit: adapters for common libraries.
Description:
The conduit package itself maintains relative small dependencies. The purpose of this package is to collect commonly used utility functions wrapping other library dependencies, without depending on heavier-weight dependencies. The basic idea is that this package should only depend on haskell-platform packages and conduit.
Expand Down
9 changes: 9 additions & 0 deletions conduit-extra/test/Data/Conduit/ProcessSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Data.Conduit.ProcessSpec (spec, main) where
import Test.Hspec
import Test.Hspec.QuickCheck (prop)
import Data.Conduit
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import Data.Conduit.Process
import Control.Concurrent.Async (concurrently)
Expand Down Expand Up @@ -77,6 +78,14 @@ spec = describe "Data.Conduit.Process" $ do
CL.consume -- stdout
CL.consume -- stderr
`shouldReturn` (ExitSuccess, [mystr], [])
it "gracefully handles closed stdin" $ do
let blob = L.iterate (+1) 0
blobHead = L.toStrict $ L.take 10000 blob
sourceCmdWithStreams "head -c 10000"
(CC.sourceLazy blob)
(L.toStrict <$> CC.sinkLazy) -- stdout
CL.consume -- stderr
`shouldReturn` (ExitSuccess, blobHead, [])
#endif
it "blocking vs non-blocking" $ do
(ClosedStream, ClosedStream, ClosedStream, cph) <- streamingProcess (shell "sleep 1")
Expand Down
Loading